#!/usr/bin/perl

# padb. a simple parallel debugging aid from Quadrics.

# For help and support visit http://padb.pittman.org.uk
# or email padb-users@pittman.org.uk

# Copyright (C) 2005-2007 Quadrics.
# Copyright (C) 2009 Ashley Pittman.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA

# Revision history
#
# Version 2.5
#  * First Non-Quadrics version
#  * Various stability/bug fixes.
#  * Deadlock detect at the MPI Layer rather than the Elan layer
#    if running with a patched MPI (Work in progress)
#  * Completely new build and packing procedure to go with the new
#    maintainer.
#  * Added "orte" to the list of resource managers supported
#  * Don't enable local-qsnet on non-qsnet systems.
#  * inner_main() now uses callbacks for resource manager support.
#  * --signal now takes names rather than numbers.
#  * Check job is valid when using the --full-report option.
#  * Add a --proc-summary option to replace --proc-info --proc-format
#    This gives a very efficient "job top" program.

# Version 2.2
#  * Add a --core-stack option along with --core and --exe to extract stack
#    traces from core files.
#
# Version 2.1
#  * Add some magic to return complex perl data structures back from the inner
#    callback functions to the output callback function. (nfreeze/base64_encode).
#  * Add "MPI watch" functionality to allow viewing of MPI state in a vmstat
#    like fashion.
#  * Add a --list-rmgrs option to list active resource managers and their jobs.
#  * Add support for "local-qsnet" as a way of launching jobs.
#  * Add support for "local-fd" as a way of launching jobs.
#  * Add support for "mpd" as a way of launching jobs.
#  * Add support for "lsf-rms" as a way of launching jobs.  Note the lsf/rms
#    integration means this is highly unlikely to work for everyone.
#  * Add a -Olsf-job-offset option for finding lsf jobs in the rms database.
#  * Support for MPI message queues as well as libelan queues (-Q)
#  * Add a -Ominfo=<exe> option for finding the new minfo.x command.
#  * Add a -Ompi-dll=<dll.so> option for overriding the debugger dll.
#  * Extend the gdb handling code to allow further expansion in the future.
#  * Make the strip-below and strip-above functions configurable.
#  * Add support for loading settings from the environment or a config file.
#  * Add support for "local" as a resource manager to target hand-crafted capabilities.
#  * Ignore case when matching stats names.
#  * Correct printing of debug information from the inner.
#  * Try and remove warnings when run with -w (still disabled)
#  * Un-break it on slurm systems without RMS installed.
#  * Preliminary threading support (courtesy of LLNL)
#  * Show per-rail sdram usage of processes.
#  * Look at all descendant processes of slurmstepd rather than direct descendants
#    and try and avoid scripts (perl/sh/bash/xterm)
#  * Use the new scontrol listpids and %A options to squeue for finding processes
#    on slurm systems (1.2.0 and above).
#  * Don't show usage on command line errors.
#  * Only pass command line options onto the inner if it is going to handle them
#
# Version 2.0
#  * Removed the -OscriptDir option as it's no longer used, use -Oedb instead.
#  * Corrected the way tally statistics were being added.
#  * Added a --show-jobs option to show possible jobs in a resource manager
#    independent way.
#  * Added a --local-stats option to show message statistics for all processes
#    on the local node.
#  * Added a --proc-format option which allows specific entries from /proc to be
#    reported on a per-vp basis.
#  * Ported to slurm only systems where the RMS kernel module isn't present.
#  * Removed the padb-helper.pl file and folded it's functionality into padb
#    itself. Padb is now self-contained.
#  * Removed the padb.gdb file from the kit, it's generated at run-time if
#    needed.
#  * Various readability fixes and small performance improvements.
#  * Added a --kill option along with --signal which can be used to send signals
#    to any process in the parallel job.
#
# Version 1.6
#  * Add a --proc-info option to show the contents of files from /proc for a
#    given rank
#  * Increase the RMS_EXITTIMEOUT value from 10 to 30 seconds and make it a
#    configuration option.
#
# Version 1.5
#  * Try and load edb from where padb is installed. This should allow it to run
#    on elan3 systems where the qsnetdefault link is set to elan3.
#  * GNAT:8110 Proper use of English in the group deadlock detection report.
#  * Target the correct process if there are multiple processes for each vp.
#    Use the pid of the process which called elan_baseInit()
#  * GNAT 7945: Fix messages on stderr about integer overflow on 32 bit machines
#  * Remove warnings when -w is turned on.
#  * Re-work the stack trace tree generation code do work via a intermediate
#    data structure to make the code easier to parse.
#  * Report errors loading stats from a running job cleanly.
#  * Better backwards compatibility with older RMS releases.
#  * Add a padb-treeview script to the release, this takes most of it's code
#    from padb and uses tk to provide the user with a X based view of the stack
#    traces.
#  * Changes to edb so the stats loading code can run on elan3 systems.
#
# Version 1.4
#  * Bumped version number to 1.4
#  * Change the format of tree based stack traces, it now uses a more logical
#    indention style.
#  * Discover and report if application stats are incomplete.
#  * Allow the use of -r with -s to view statistics from an individual process.
#    Update -S (which does the same thing) to parse the stats in padb as well.
#  * Improved error handling in the case where jobs complete whilst padb is
#    running, sample the job state before and after going parallel and do the
#    right thing accordingly.
#  * Much improved error output, only report an error if something bad happened.
#  * Changes to the code as required to enable padb to run cleanly with
#    warnings (-w) enabled.
#  * Added a -Ostats-name= option to allow the extraction of one specific
#    statistic from the command line.
#  * Create separate file descriptors for stdout and stderr when running in
#    parallel to make it more resilient.
#
# Version 1.3
#  * Strip stack traces below main when possible, add a --nostrip-below-main
#    option to turn this off.
#  * Strip stack traces above elan_waitWord when possible, add a
#    --nostrip-above-wait option to turn this off.
#  * Added a -Ogdb-retry-count=N option. Defaults to three as before but is now
#    tunable.
#  * Parse communication statistics in padb directly now rather then relying on
#    edb to do it for us
#  * Allow reading of stats from file (-s -i )
#  * Perform group deadlock detection in padb directly rather than in edb,
#    improved the output and handling of corner cases.
#  * Initial version of a "one process per line" method of statistics reporting.
#  * Better catching and reporting of errors when running parallel programs.
#  * Bumped the version number to 1.3
#
# Version 1.2
#  * Converted padb to use long command line options. The short ones still work
#    but now have long alternatives
#  * Removed the need to set -OscriptDir= when running in non-default locations
#  * Added a --full-report=<jobId> option to gather all possible information
#  * General tidy ups in the stack trace tree generation code.
#  * Now reports processes that aren't present when generating stack traces.
#  * Now reports errors properly when there are errors launching the parallel
#    job
#  * Calls edb directly rather than using a helper script when possible
#    (statistics reports).
#  * Incremented version number from 1.0 to 1.2.
#

# TODO:
#
# * More testing with -w turned on.
# * Multi-pass argument handling, --kill also accepts --signal for example,
#   this should really be done at the getopt layer.  Also proper usage
#   info for these secondary args.
# * slurm_find_pids() has some good code in it for finding parallel processes
#   this should be extrapolated out and so it can be used in the mpd case,
#   ideally on non-rms systems (RMS rocks in this regard) the rmgr callback
#   should return a list of spawned pids and the code in slurm_find_pids() should
#   pass this tree to find the most interesting one.
# * The mode {handler} functions should only be called once per node, it could then
#   correctly handle $confInner{gdb_file} and also attach to every process per node
#   simultaneously, this would help stack trace and message queue support as doing
#   then one at a time results in weird artifacts.  (Done for stack traces but not
#   message queues).
# * Output parsing, {out_handler} is a good start but in stack traces the tree
#   format is optional, maybe have the secondary arg have a {out_hander} attached?
# * libunwind support?  lighter weight than gdb and possibly more reliable.
# * --watch should launch one persistent parallel job rather than a new one every go,
#   this would require full-duplex comms between inner and outer however so could
#   present scaling problems.  Maybe PMI would help here?
# * POD? generated man page?
# * mode specific defaults, for example --mpi-watch should enable --watch
#   -Owatch-clears-screen=0
# * Make -q fallback to -Q if tports are not available
# * ???

###############################################################################

use strict;
use Getopt::Long;
use File::Basename;
use IPC::Open3;
use Cwd;
use Data::Dumper;
use Storable qw(dclone nfreeze thaw);
use Sys::Hostname;
use File::Temp qw(tempfile);
use MIME::Base64;
use Config;

###############################################################################
#
# Header.
#
###############################################################################

# Formatted with the command 'perltidy -b -ce -w padb' to maintain a
# vaguely readable form.

# This (large) source file contains a number of loosely separated segments,
# namely...

# Header.
# Resource manager setup
# Config options and defaults
# Usage and version
# Globals
# Elan statistics.
# Group deadlock detection
# Local (per node) stats.
# Stack trace tree compression.
# RMS support.
# Slurm support.
# Resource manager support.
# Output formatting
# Data collection (parallel and from file).
# Outer main
# Inner
# Main.

my $prog    = basename $0;
my $version = "2.5";

my %conf;

# Config options the inner knows about, only forward options if they are in this list.
my @inner_conf = qw(edb edbopt minfo rmgr scripts slurm-job-step verbose);

###############################################################################
#
# Resource manager setup
#
###############################################################################

# A hash of supported resource managers, each of which provides a number of
# functions for querying the state of the machine.  This keeps the core
# of the code tidy.   Note that this is only for the "outer" instance of the
# program, the inner version does things differently.

# Function        Args Returns   Required Description
# is_installed    -    Bool      yes      Check for being installed and running.
# get_active_jobs user List      yes      Return list of all active job for user.
# is_job_running  job  Bool      no       Check if a given job is running.
# job_to_key      job  key       no       Convert from jobId to shm key.
# setup_pcmd      job  cmd|ncpus yes      Command needed to launch shadow jobs.
# cleanup_pcmd    -    -         no       Cleans up and temporary files.
# find_pids       job  -         maybe    Called on the inner to locate pids.

# inner_rmgr      var  n/a       no       Resource manager to masquerade as.

my %rmgr;

$rmgr{"rms"} = {
    'is_installed'    => \&rms_is_installed,
    'get_active_jobs' => \&rms_get_jobs,
    'job_is_running'  => \&rms_job_is_running,
    'job_to_key'      => \&rms_job_to_key,
    'setup_pcmd'      => \&rms_setup_pcmd,
    'find_pids'       => \&rms_find_pids,
};

$rmgr{"mpd"} = {
    'is_installed'    => \&mpd_is_installed,
    'get_active_jobs' => \&mpd_get_jobs,
    'setup_pcmd'      => \&mpd_setup_pcmd,
    'cleanup_pcmd'    => \&mpd_cleanup_pcmd,
    'find_pids'       => \&mpd_find_pids,
};

$rmgr{"orte"} = {
    'is_installed'    => \&open_is_installed,
    'get_active_jobs' => \&open_get_jobs,
    'setup_pcmd'      => \&open_setup_pcmd,
    'cleanup_pcmd'    => \&open_cleanup_pcmd,
    'find_pids'       => \&open_find_pids,
};

$rmgr{"lsf-rms"} = {
    'is_installed'    => \&lsf_is_installed,
    'get_active_jobs' => \&lsf_get_jobs,
    'setup_pcmd'      => \&lsf_setup_pcmd,
    'inner_rmgr'      => "rms",
};

$rmgr{"slurm"} = {
    'is_installed'    => \&slurm_is_installed,
    'get_active_jobs' => \&slurm_get_jobs,
    'job_is_running'  => \&slurm_job_is_running,
    'setup_pcmd'      => \&slurm_setup_pcmd,
    'find_pids'       => \&slurm_find_pids,
};

$rmgr{"local"} = {
    'get_active_jobs' => \&local_get_jobs,
    'job_is_running'  => \&local_job_is_running,
    'setup_pcmd'      => \&local_setup_pcmd,
    'find_pids'       => \&local_find_pids,
};

$rmgr{"local-qsnet"} = {
    'is_installed'    => \&local_q_is_installed,
    'get_active_jobs' => \&local_q_get_jobs,
    'job_is_running'  => \&local_job_is_running,
    'setup_pcmd'      => \&local_setup_pcmd,
    'inner_rmgr'      => "local",
};

$rmgr{"local-fd"} = {
    'get_active_jobs' => \&local_fd_get_jobs,
    'job_is_running'  => \&local_job_is_running,
    'setup_pcmd'      => \&local_setup_pcmd,
    'inner_rmgr'      => "local",
};

###############################################################################
#
# Config options
#
###############################################################################

# If changing any of these defaults also check the inner code as some
# of these settings affect that program as well and padb will only
# pass on settings on the command line, not the entire config hash.
# The reason they are listed here as well is so that padb -O help
# works and gives the correct defaults.

my %allfns;

my $line_formatted = 0;
my $countoutput    = 0;

my %cinner;    # Config options to be passed to inner.
my $rem_jobid;

# Debug options.
$conf{"verbose"}      = 0;
$conf{"tree-verbose"} = 0;
$conf{"dump-raw"}     = 0;
$conf{"showcmd"}      = 0;

$conf{slurm_job_step} = "0";

# Output options.
$conf{"stats-sort-key"}      = "vp";
$conf{"proc-sort-key"}       = "vp";
$conf{"proc-show-header"}    = 1;
$conf{"stats-reverse"}       = 0;
$conf{"stats-short"}         = 0;
$conf{"show-group-members"}  = 0;
$conf{"show-all-stats"}      = 0;
$conf{"show-all-groups"}     = 0;
$conf{"interval"}            = 10;
$conf{"watch-clears-screen"} = 1;
$conf{"stats-name"}          = undef;
$conf{"stats-raw"}           = 0;
$conf{"scripts"}             = "bash,sh,dash,ash,perl,xterm";
$conf{"stack-strip-below"}   = "main";
$conf{"lsf-job-offset"}      = 1;
$conf{"local-fd-name"}       = "/dev/null";
$conf{"stack-strip-above"} =
  "elan_waitWord,elan_pollWord,elan_deviceCheck,opal_condition_wait";

# $conf{stack-format}        = undef;

# Tuning options.
$conf{"prun-timeout"}     = 120;
$conf{"prun-exittimeout"} = 120;
$conf{"rmgr"}             = "auto";

# These settings are passed onto inner only.
$conf{"edbopt"}  = "";
$conf{"mpi-dll"} = "auto";

$conf{"edb"}   = find_edb();
$conf{"minfo"} = find_minfo();

my $norc       = 0;
my $configfile = "/etc/padb.conf";

# Look for edb in the default install location only.
sub find_edb {
    return "/usr/lib/qsnet/elan4/bin/"
      if ( -d "/usr/lib/qsnet/elan4/bin/" );
    return "/usr/lib64/qsnet/elan4/bin/"
      if ( -d "/usr/lib64/qsnet/elan4/bin/" );
    return "edb";
}

# Look for minfo.x in the same directory as padb.
sub find_minfo {
    my $dir = dirname($0);
    return "$dir/minfo.x";
}

###############################################################################
#
# usage and version.
#
###############################################################################

sub show_version {
    printf("$prog version $version\n\n");
    printf("Written by Ashley Pittman\n");

    #ashley@quadrics.com
    exit 0;
}

my $usage = <<EOF;
Usage: padb [-hv] [-c|-C|-t] -g|-q|-s|-x|-X [-O <opt>=<val>
            [,<opt>=<val>...]] [-i <file>] [-r <rank>] [-u <user>]
            -a|-A|<jobid ...>

-a --all               report on all running jobs for user.
-A --any               report on a running job for user.
-u --user=USER         report on jobs for username=<user>.

-r --rank=RANK         report only on processes <RANK>.
   --group-id=ID       report only on group <ID>.

-s --statistics        Show the job-wide statistics.
-g --group             Show the state of collective operations (groups).
XXXX
   --full-report=JOBID All of the above.

   --nostrip-below-main Don\'t strip stack traces below main.
   --nostrip-above-wait Don\'t strip stack traces about elan_waitWord.

   --proc-format       Specify information to show about processes.

-c --compress          Use dshbac -c format.
-C --compress-long     Use other dshbak format.
-t --tree              Use tree based output for stack traces.
-i --input-file=FILE   Read input from file.

   --watch             

-O [opt1=val,<opt2=val>] Set internal config options for padb, advanced use only.
  Options in this version (these are liable to change)
  Use -Ohelp for showing current settings

  General options:
  verbose              Set verbosity level.
  edb                  Full path to edb

  Slurm only options
  slurm-job-step       Job step to target.

  RMS only options
  prun-timeout         Timeout to use when launching parallel job.

  Stack trace options:
  tree-verbose         turn on debugging for the stack trace tree generation code.
  gdb-retry-count      Number of times to try getting a 'good' stack trace from gdb.
  stack-show-params    Show function parameters in stack traces.
  stack-show-locals    Show locals in stack traces.

  Statistics options:
  stats-short          Turn on "one process per line" stats reporting code.
  stats-sort-key       Sort stats by <key>.
  stats-reverse        Reverse order when showing stats.
  stats-name           Only report the value of a single stat.

  Group deadlock detection options:
  show-group-members   Show group to vp translations in the group code.
  show-all-groups      Report on all groups in a job.

  Watch options:
  interval             Refresh rate.

-v --verbose           Verbose.
-V --version           Show version number and exit.
-h --help              print this usage message.
EOF

sub usage {
    chomp $usage;

    my $extra = "";
    foreach my $arg ( sort( keys %allfns ) ) {
        next unless ( defined $allfns{$arg}{help} );
        if ( defined $allfns{$arg}{arg_short} ) {
            $extra .= "-$allfns{$arg}{arg_short}";
        } else {
            $extra .= "  ";
        }
        $extra .= sprintf( " --%-18s%s.\n",
            $allfns{$arg}{arg_long},
            $allfns{$arg}{help} );
    }

    $usage =~ s!XXXX!$extra!;

    print STDERR <<EOF;
$usage
EOF
    exit 1;
}

###############################################################################
#
# Globals.
#
###############################################################################

my $user = getpwuid($<);
my @ranks;
my @target_groups;
my $all;
my $any;

# Number of functions provided on the command line from the allfns hash.
my $have_allfns_option = 0;

my $stats_total;
my $group;

my $full_report;
my $core_stack;
my $list_rmgrs;
my $watch;
my $local_stats;
my $show_jobs;

my $core_name;
my $exe_name;

my $proc_format =
  "vp=vpid,hostname,pid,vmsize,vmrss,stat.state=S,pcpu=%cpu,name=command";

my $input_file;
my $compress;
my $compress_C;
my $tree;

my $strip_below_main = 1;
my $strip_above_wait = 1;

my @config_options;
my %ic_names;

# Populated in the outer args section so that outer code
# can access secondary comamnd line argunments by name.
my %secondary_args;

sub parse_args_outer {

    Getopt::Long::Configure("bundling");
    my $mode;

    my %optionhash = (
        "verbose|v+"                     => \$conf{verbose},
        "user|u=s"                       => \$user,
        "rank|r=i"                       => \@ranks,
        "group-id=i"                     => \@target_groups,
        "help|h"                         => \&usage,
        "all|a"                          => \$all,
        "any|A"                          => \$any,
        "statistics-total|stat|sta|st|s" => \$stats_total,
        "version|V"                      => \&show_version,
        "compress|c"                     => \$compress,
        "compress-long|C"                => \$compress_C,
        "group|g"                        => \$group,
        "tree|t"                         => \$tree,
        "input-file|file|i=s"            => \$input_file,
        "config-option|O=s"              => \@config_options,
        "full-report=s"                  => \$full_report,
        "core-stack"                     => \$core_stack,
        "core=s"                         => \$core_name,
        "exe=s"                          => \$exe_name,
        "list-rmgrs"                     => \$list_rmgrs,
        "strip-below-main!"              => \$strip_below_main,
        "strip-above-wait!"              => \$strip_above_wait,
        "watch!"                         => \$watch,
        "local-stats"                    => \$local_stats,
        "proc-format=s"                  => \$proc_format,
        "show-jobs"                      => \$show_jobs,
        "norc"                           => \$norc,
        "config-file=s"                  => \$configfile
    );

    my %config_hash;
    foreach my $arg ( keys %allfns ) {
        $optionhash{ $allfns{$arg}{arg} } = \$config_hash{$arg};
        if ( defined $allfns{$arg}{secondary} ) {
            foreach my $sec ( @{ $allfns{$arg}{secondary} } ) {
                $sec->{value} = $sec->{default};
                $optionhash{ $sec->{arg} } = \$sec->{value};
            }
        }
        if ( defined $allfns{$arg}{options_i} ) {
            foreach my $o ( keys( %{ $allfns{$arg}{options_i} } ) ) {
                $conf{$o} = $allfns{$arg}{options_i}{$o};
                $ic_names{$o}++;
            }
        }
    }

    GetOptions(%optionhash) or exit(1);

    foreach my $arg ( keys %config_hash ) {
        next unless defined $config_hash{$arg};
        $mode = $arg;
        $have_allfns_option++;
    }

    # Put the args in a hash so that they can be referenced by name.
    if ( defined $allfns{$mode}{secondary} ) {
        foreach my $sec ( @{ $allfns{$mode}{secondary} } ) {
            $secondary_args{ $sec->{arg_long} } = $sec->{value};
        }
    }

    return $mode;
}

###############################################################################
#
# Elan statistics.
#
###############################################################################

# Work around problems with the "hex" function and whilst we are
# at it avoid warnings as well.
# Unfortunately hex can't deal with anything bigger than 2^31 without
# giving an error so simply +0 on the string to convert it to a int
# cleanly (GNAT 7945).
sub _hex {
    my $str = shift;
    if ( not defined $str ) {
        return 0;
    } elsif ( $str eq "0xffffffffffffffff" ) {
        return -1;
    } else {

        if ( length $str < 10 ) {
            return hex($str);
        }

        # It was hard to write, it's supposed to be hard to read.

        $str =~ s/\A0x//;
        my $lower = hex( "0x" . substr( "0" x 8 . $str, -8 ) );
        my $upper = hex( "0x" . substr( "0" x 16 . $str, -16, 8 ) );
        $lower += ( 0x10000000 * 0x10 * $upper );

        return $lower;
    }
}

sub sum_attr {
    my ( $current, $sum_so_far ) = @_;

    if ( defined $sum_so_far->{'raw'}[0]
        and $sum_so_far->{'raw'}[0] != $current->{'raw'}[0] )
    {
        $sum_so_far->{'raw'}[0] = undef;
    }

    return $sum_so_far;
}

sub sum_bin {
    my ( $current, $sum_so_far ) = @_;

    for ( my $j = 0 ; $j < 32 ; $j++ ) {
        $sum_so_far->{'raw'}[$j] += $current->{'raw'}[$j];
    }

    #check min
    if (
        ( $sum_so_far->{'raw'}[32] == -1 )
        or (    ( $current->{'raw'}[32] != -1 )
            and ( $current->{'raw'}[32] < $sum_so_far->{'raw'}[32] ) )
      )
    {
        $sum_so_far->{'raw'}[32] = $current->{'raw'}[32];
    }

    #check max
    if ( $current->{'raw'}[33] > $sum_so_far->{'raw'}[33] ) {
        $sum_so_far->{'raw'}[33] = $current->{'raw'}[33];
    }

    #total
    $sum_so_far->{'raw'}[34] += $current->{'raw'}[34];

    return $sum_so_far;
}

sub sum_counter {
    my ( $current, $sum_so_far ) = @_;

    $sum_so_far->{'raw'}[0] += $current->{'raw'}[0];
    return $sum_so_far;
}

sub sum_tally {
    my ( $current, $sum_so_far ) = @_;

    for ( my $j = 0 ; $j < 3 ; $j++ ) {
        $sum_so_far->{'raw'}[$j] += $current->{'raw'}[$j];
    }

    return $sum_so_far;
}

my @scales = (
    "Bytes",     "Kilobytes", "Megabytes", "Gigabytes",
    "Terabytes", "Petabytes", "Exabytes"
);

my @bin_names = (
    "0 bytes",   "1 byte",    "2 bytes",   "4 bytes",
    "8 bytes",   "16 bytes",  "32 bytes",  "64 bytes",
    "128 bytes", "256 bytes", "512 bytes", "1kb",
    "2kb",       "4kb",       "8kb",       "16kb",
    "32kb",      "64kb",      "128kb",     "256kb",
    "512kb",     "1mb",       "2mb",       "4mb",
    "8mb",       "16mb",      "32mb",      "64mb",
    "128mb",     "256mb",     "512mb",     "overflow"
);

sub show_counter {
    my ($d) = @_;

    my $ret = "";
    my $toshow;
    foreach my $counter ( sort keys %{$d} ) {

        if ( $d->{$counter}{raw}[0] != 0 or $conf{"show-all-stats"} ) {
            if ( defined $toshow ) {
                $ret .=
"  Counter: '$d->{$toshow}{name}' = '$d->{$toshow}{raw}[0]','$d->{$counter}{name}' = '$d->{$counter}{raw}[0]'\n";
                undef $toshow;
            } else {
                $toshow = $counter;
            }
        }
    }

    if ( defined $toshow ) {
        $ret .= "  Counter: '$d->{$toshow}{name}' = '$d->{$toshow}{raw}[0]'\n";
    }

    return $ret;
}

sub show_attr {

    # Hopefully have an array at this point.
    my ($d) = @_;

    my $ret = "";
    my $toshow;
    foreach my $attr ( sort keys %{$d} ) {
        next unless defined $d->{$attr}{raw}[0];
        if ( defined $toshow ) {
            $ret .=
"  Attribute: '$d->{$toshow}{name}' = '$d->{$toshow}{raw}[0]', '$d->{$attr}{name}' = '$d->{$attr}{raw}[0]'\n";
            undef $toshow;
        } else {
            $toshow = $attr;
        }

    }
    if ( defined $toshow ) {
        $ret .=
          "  Attribute: '$d->{$toshow}{name}' = '$d->{$toshow}{raw}[0]'\n";

    }
    return $ret;
}

sub show_tally {
    my ($d) = @_;

    my $ret = "";
    foreach my $tally ( sort keys %{$d} ) {
        if ( $d->{$tally}{raw}[0] or $conf{"show-all-stats"} ) {
            $ret .= sprintf(
                "%16s: Total: %d Active: %d HWM: %d\n",
                $d->{$tally}{name},   $d->{$tally}{raw}[0],
                $d->{$tally}{raw}[1], $d->{$tally}{raw}[2]
            );
        }
    }
    return $ret;
}

sub show_bin {
    my ($d) = @_;

    my $ret = "";
    foreach my $bin ( sort keys %{$d} ) {

        if (   $d->{$bin}{raw}[0] || $d->{$bin}{raw}[34]
            or $conf{"show-all-stats"} )
        {
            my $total = $d->{$bin}{raw}[34];
            my $scale = 0;

            while ( $total > 1024 ) {
                $total /= 1024;
                $scale++;
            }

            $ret .= sprintf(
"%16s: min $d->{$bin}{raw}[32] max $d->{$bin}{raw}[33] total $d->{$bin}{raw}[34] (%0.2f $scales[$scale])\n",
                $d->{$bin}{name}, $total );

            my @vals;
            for ( my $j = 0 ; $j < 32 ; $j++ ) {
                if ( $d->{$bin}{raw}[$j] or $conf{"show-all-stats"} > 1 ) {
                    push(
                        @vals,
                        sprintf( "%9s: %10d",
                            $bin_names[$j], $d->{$bin}{raw}[$j] )
                    );

                    if ( $#vals == 2 ) {
                        $ret .= sprintf( "  %s\n", join( " ", @vals ) );
                        undef @vals;
                    }
                }
            }
            if ( $#vals != -1 ) {
                $ret .= sprintf( "  %s\n", join( " ", @vals ) );
                undef @vals;
            }
        }
    }
    return $ret;
}

# These must stay in the correct order, that is the order
# they appear in shared memory.
my @stat_types = ( "Counter", "Tally", "Bin", "Attribute" );

my @display_order = qw(Attribute Counter Tally Bin);

my %stat_types2 = (
    Counter =>
      { size => "1", displayfn => \&show_counter, sumfn => \&sum_counter },
    Tally => { size => "3",  displayfn => \&show_tally, sumfn => \&sum_tally },
    Bin   => { size => "35", displayfn => \&show_bin,   sumfn => \&sum_bin },
    Attribute => { size => "1", displayfn => \&show_attr, sumfn => \&sum_attr },
);

sub parse_header {
    my ($block) = @_;
    my @a = split( ",", $block );

    my @header;

    if ( $a[0] ne "ELAN STATS" or $a[1] ne "falcon" ) {
        return undef;
    }

    my $index;
    for ( $index = 0 ; $index < 4 ; $index++ ) {
        $header[$index] = $a[$index];
    }

    while ( $index < $#a ) {
        $header[ $a[$index] ] = $a[ $index + 1 ];
        $index += 2;
    }

    return \@header;
}

# Convert from subsystem ID to name.
sub get_sub_name {
    my ( $id, $header ) = @_;
    $id *= 2;
    $id += 4;
    return $header->[$id];
}

# Convert from subsystem ID and stat type # to count.
sub get_sub_stat_count {
    my ( $id, $type, $header ) = @_;

    # Check for an invalid subsystem number.
    if ( $id >= _hex $header->[3] ) {
        return 0;
    }

    # Check for an invalid stat type.
    if ( $type >= _hex $header->[2] ) {
        return 0;
    }

    # Skip over the four entry header and expand.
    $id *= 2;
    $id += 4;

    # Move from subsystem name to offset.
    $id++;

    # Follow the offset.
    $id = $header->[$id];

    # Move to the correct type.
    $id += $type;
    return $header->[$id];
}

# Params:
# $id              Index of this subsystem.
# $type            This stat type.
# $idx             Number of this stat.
sub get_sub_stat_name {
    my ( $id, $type, $idx, $header ) = @_;

    # Check for an invalid subsystem number.
    if ( $id >= _hex $header->[3] ) {
        return 0;
    }

    # Check for an invalid stat type.
    if ( $type >= _hex $header->[2] ) {
        return 0;
    }

    # Skip over the four entry header and expand.
    $id *= 2;
    $id += 4;

    # Move from subsystem name to offset.
    $id++;

    # Follow the offset.
    $id = $header->[$id];

    my $offset = $id;

    # Header[2] is the number of stats type's.  4 currently.
    $id += _hex $header->[2];

    for ( my $i = 0 ; $i < $type ; $i++ ) {
        $id += $header->[ $offset + $i ];
    }

    return $header->[ $id + $idx ];
}

sub find_rail {
    my $r = shift;

    my $rail = _hex $r;

    if ( $rail == -1 ) {
        return "ELAN_RAIL_ALL";
    } else {
        return $rail;
    }
}

sub parse_content {
    my ( $block, $header ) = @_;
    my @a = split( ",", $block );
    my $index = 0;

    my @raw_data;

    return undef if ( $#a < 5 );

    for ( $index = 0 ; $index < 4 ; $index++ ) {
        $raw_data[$index] = _hex( $a[$index] );
    }

    while ( $index < $#a ) {
        $raw_data[ $a[$index] ] = $a[ $index + 1 ];
        $index += 2;
    }

    my %process_details;

    $process_details{vp}      = $raw_data[0];
    $process_details{nvp}     = $raw_data[1];
    $process_details{localid} = $raw_data[2];
    $process_details{nlocal}  = $raw_data[3];

    my $instBase = 4;

    while ( $instBase != 0 ) {
        my $sysId = _hex( $raw_data[$instBase] );
        my $sysname = get_sub_name( $sysId, $header );

        my %inst;

        $inst{sysId}      = _hex( $raw_data[$instBase] );
        $inst{name}       = get_sub_name( $sysId, $header );
        $inst{id}         = _hex $raw_data[ $instBase + 1 ];
        $inst{handle}     = $raw_data[ $instBase + 2 ];
        $inst{stats}      = _hex $raw_data[ $instBase + 6 ];
        $inst{rail}       = find_rail $raw_data[ $instBase + 4 ];
        $inst{next}       = _hex $raw_data[ $instBase + 5 ];
        $inst{valid}      = _hex $raw_data[ $instBase + 3 ];
        $inst{debugFlags} = $raw_data[ $instBase + 7 ];

        if ( $inst{stats} ) {
            my %stats;

            my $offset = $inst{stats};

            for ( my $type = 0 ; $type < $#stat_types + 1 ; $type++ ) {
                my $typename = $stat_types[$type];
                my $count = get_sub_stat_count( $inst{sysId}, $type, $header );

                next if $count eq 0;
                my %type;
                for ( my $idx = 0 ; $idx < $count ; $idx++ ) {
                    my %data;
                    my @raw;
                    for (
                        my $value = 0 ;
                        $value < $stat_types2{$typename}{size} ;
                        $value++
                      )
                    {
                        $raw[$value] = _hex $raw_data[$offset];
                        $offset++;
                    }
                    $data{name} =
                      get_sub_stat_name( $inst{sysId}, $type, $idx, $header );
                    $data{raw} = \@raw;

                    $type{ $data{name} } = \%data;
                }
                $stats{$typename} = \%type;
            }
            $inst{statistics} = \%stats;
        } else {
            $process_details{complete} = 0;
        }

        $instBase = $inst{next};

        delete $inst{stats};
        delete $inst{next};
        delete $inst{sysId};
        delete $inst{debugFlags} if ( !$inst{debugFlags} );

        $process_details{subsystems}{ $inst{name} }{ $inst{id} } = \%inst;

    }

    # Work out if there is missing data.
    if (
        defined $process_details{subsystems}{Core}{1}{statistics}{Counter}
        {Overflow} )
    {
        if ( not defined $process_details{complete} ) {
            if ( $process_details{subsystems}{Core}{1}{statistics}{Counter}
                {Overflow}{raw}[0] == 0 )
            {
                $process_details{complete} = 1;
            } else {
                $process_details{complete} = 0;
            }
        }
    }

    return \%process_details;
}

sub total {
    my ($data_structures_aref) = @_;
    my %total;         #holds data structures keyed by name and id
    my @keys_order;    #keep the order new names and ids were encountered

    # Make an initial total by just copying the first set of stats
    # carte blance.
    my $summed_structure = dclone( $data_structures_aref->[0] );

    for ( my $cr = 1 ; $cr <= $#{$data_structures_aref} ; $cr++ ) {
        my $current_structure = $data_structures_aref->[$cr];

        # Copy the vp and nvp information, we might miss new entries
        # in current_structure but for the time being all entries
        # are known to be common.
        # XXX: This assertion no longer holds true, {complete} is only
        # defined where it is known and has values 0 and 1.
        # Having said that it will all work though as if it's value is not
        # known it can't be 1 which is the only value we care about.
        foreach my $header ( keys %{$summed_structure} ) {
            next if ( $header eq 'subsystems' );
            if ( defined $summed_structure->{$header}
                and $summed_structure->{$header} ne
                $current_structure->{$header} )
            {
                $summed_structure->{$header} = undef;
            }
        }

        #add to each set of stats if it exists, else clone the new set
        foreach my $name ( keys %{ $current_structure->{'subsystems'} } ) {
            if ( $summed_structure->{'subsystems'}{$name} ) {
                foreach
                  my $id ( keys %{ $current_structure->{'subsystems'}{$name} } )
                {
                    if ( $summed_structure->{'subsystems'}{$name}{$id} ) {

                        next
                          unless (
                            defined $current_structure->{'subsystems'}{$name}
                            {$id}{'statistics'} );

                        if (
                            not( $summed_structure->{'subsystems'}{$name}{$id}
                                {'statistics'} )
                          )
                        {

                            $summed_structure->{'subsystems'}{$name}{$id}
                              {'statistics'} = dclone(
                                $current_structure->{'subsystems'}{$name}{$id}
                                  {'statistics'} );
                            next;
                        }

                        my %current_stat =
                          %{ $current_structure->{'subsystems'}{$name}{$id}
                              {'statistics'} };

                        my %summed_stat =
                          %{ $summed_structure->{'subsystems'}{$name}{$id}
                              {'statistics'} };

                        #add to each type of stats if it exists, else copy
                        #the new set
                        foreach my $stat_type ( keys %current_stat ) {
                            if ( $summed_stat{$stat_type} ) {

                                foreach my $stat_name (
                                    keys %{ $current_stat{$stat_type} } )
                                {
                                    if (
                                        $summed_stat{$stat_type}->{$stat_name} )
                                    {

                                #do the adding up correctly for the type of stat
                                        $summed_stat{$stat_type}->{$stat_name} =
                                          $stat_types2{$stat_type}{sumfn}(
                                            $current_stat{$stat_type}
                                              ->{$stat_name},
                                            $summed_stat{$stat_type}
                                              ->{$stat_name}
                                          );
                                    } else {
                                        $summed_stat{$stat_type}->{$stat_name} =
                                          dclone( $current_stat{$stat_type}
                                              ->{$stat_name} );
                                    }
                                }
                            } else {
                                $summed_stat{$stat_type} =
                                  dclone( $current_stat{$stat_type} );
                            }
                        }
                    } else {
                        $summed_structure->{'subsystems'}{$name}{$id} = dclone(
                            $current_structure->{'subsystems'}{$name}{$id} );
                    }
                }
            } else {
                $summed_structure->{$name} =
                  dclone( $current_structure->{$name} );
            }
        }
    }

    return $summed_structure;
}

# Convert from long to terse stats.
sub summarise {

    my $datastructure = shift;
    my %ret           = (
        'Bin'     => 0,
        'Counter' => 0,
        'Tally'   => 0
    );
    if ( defined $datastructure->{'vp'} ) {
        $ret{'vp'} = $datastructure->{'vp'};
    }
    foreach my $subsystem ( keys %{ $datastructure->{'subsystems'} } ) {
        foreach my $id ( keys %{ $datastructure->{'subsystems'}{$subsystem} } )
        {
            my $statistics =
              $datastructure->{'subsystems'}{$subsystem}{$id}{'statistics'};
            foreach my $bin ( keys %{ $statistics->{'Bin'} } ) {

                #Bin has a total value so just add that
                $ret{'Bin'} += $statistics->{'Bin'}{$bin}{'raw'}[34];
            }
            foreach my $counter ( keys %{ $statistics->{'Counter'} } ) {
                $ret{'Counter'} += $statistics->{'Counter'}{$counter}{'raw'}[0];
            }
            foreach my $tally ( keys %{ $statistics->{'Tally'} } ) {
                $ret{'Tally'} += $statistics->{'Tally'}{$tally}{'raw'}[0];
            }
        }
    }
    return \%ret;
}

sub summarise_many {
    my $many = shift;
    my @ret;
    foreach my $single ( @{$many} ) {
        push( @ret, summarise($single) );
    }
    return \@ret;
}

sub collapse_summaries {
    my $summaries = shift;
    my %ret       = (
        'Bin'     => 0,
        'Counter' => 0,
        'Tally'   => 0
    );
    foreach my $summary ( @{$summaries} ) {
        foreach my $key ( keys %ret ) {
            $ret{$key} += $summary->{$key};
        }
    }
    return \%ret;
}

sub display_hash {
    my $hash = shift;
    format WITH_VP =
vp @>>>> Counter @>>>>>>>>> Tally @>>>>>>>>> Bin @>>>>>>>>>>>>>>
$hash->{vp}, $hash->{Counter}, $hash->{Tally}, $hash->{Bin}
.
    format WITHOUT_VP =
Counter @>>>>>>>>> Tally @>>>>>>>>> Bin @>>>>>>>>>>>>>>
$hash->{Counter}, $hash->{Tally}, $hash->{Bin}
.
    local $~;
    if ( defined $hash->{vp} ) {
        $~ = "WITH_VP";
    } else {
        $~ = "WITHOUT_VP";
    }
    write STDOUT;
}

sub display_hashes {
    my ( $hashes, $sort, $reverse ) = @_;
    my $ret = '';

    my $rev = $reverse;

    $rev = not $rev if ( $sort eq "vp" );

    if ($rev) {
        foreach my $e ( sort { $a->{$sort} <=> $b->{$sort} } ( @{$hashes} ) ) {
            $ret .= display_hash($e);
        }
    } else {
        foreach my $e ( sort { $b->{$sort} <=> $a->{$sort} } ( @{$hashes} ) ) {
            $ret .= display_hash($e);
        }
    }
    return $ret;
}

# FIXME:  This function really should be merged with as show_inst...
sub show_name {
    my ( $des, $stats ) = @_;

    if ( not defined $des ) {
        return show_inst($stats);
    }

    my @req = split( "\\.", $des );

    my $ret = "";

    foreach my $name2 ( sort keys %{ $stats->{subsystems} } ) {
        my $name = $stats->{subsystems}{$name2};

        next unless ( lc($name2) eq lc( $req[0] ) );

        foreach my $id2 ( sort { $a <=> $b } keys %{$name} ) {
            my $sis = $name->{$id2};

            next if ( $#req > 0 and $sis->{id} ne $req[1] );

            if ( $#req < 2 ) {
                $ret .=
"Subsystem '$sis->{name}' id: $sis->{id}  Handle: $sis->{handle} rail: $sis->{rail}\n";
            }

            foreach my $type (@display_order) {
                next unless defined $sis->{statistics}{$type};

                if ( $#req > 1 ) {

                    foreach
                      my $s_name ( sort keys %{ $sis->{statistics}{$type} } )
                    {
                        next if ( $#req > 1 and lc($s_name) ne lc( $req[2] ) );
                        $ret .= "@{$sis->{statistics}{$type}{$s_name}{raw}}\n";
                    }
                } else {
                    if ( defined $stat_types2{$type}{displayfn} ) {
                        $ret .=
                          $stat_types2{$type}{displayfn}(
                            $sis->{statistics}{$type} );
                    }
                }
            }
        }
    }
    return $ret;
}

sub show_inst {
    my ($stats) = @_;

    my $ret;

    if ( defined $stats->{vp} ) {
        $ret = "This is vp $stats->{vp}/$stats->{nvp}\n";
    } else {
        $ret = "Statistics for a $stats->{nvp} process job\n";
    }

    foreach my $name2 ( sort keys %{ $stats->{subsystems} } ) {
        my $name = $stats->{subsystems}{$name2};

        foreach my $id2 ( sort { $a <=> $b } keys %{$name} ) {
            my $sis = $name->{$id2};

            $ret .=
"Subsystem '$sis->{name}' id: $sis->{id}  Handle: $sis->{handle} rail: $sis->{rail}\n";

            if ( not defined $sis->{statistics} ) {
                $ret .= "no statistics recorded.\n";
                next;
            }

            foreach my $type (@display_order) {
                next unless defined $sis->{statistics}{$type};

                if ( defined $stat_types2{$type}{displayfn} ) {
                    $ret .=
                      $stat_types2{$type}{displayfn}(
                        $sis->{statistics}{$type} );

                }
            }
        }
    }
    return $ret;
}

sub read_stats {
    my @data = @_;

    printf Dumper \@data if $conf{"dump-raw"};

    my $header = parse_header( shift @data );

    return undef unless $header;

    my @out;
    foreach my $vp (@data) {
        my $parsed = parse_content( $vp, $header );
        if ( defined $parsed ) {
            push( @out, $parsed );
        }
    }

    printf Dumper \@out if $conf{"dump-raw"};

    return \@out;
}

sub show_stats {
    my $d = shift;

    # This function is slightly delicate, the --full-report option
    # calls this function with $stats_total and $group set.

    # What to do about the -r option:
    # If it's set then display individual results for the given
    # vp's only, if it's not set then display a total for everyone.

    if ( not $d ) {
        print("QsNet Statistics not valid\n");
        return;
    }

    if ($stats_total) {

        if ( $conf{"stats-short"} ) {
            my $new;
            if ( $#ranks != -1 ) {
                my @ret;
                foreach my $rank (@ranks) {
                    if ( defined $d->[$rank] ) {
                        push( @ret, summarise( $d->[$rank] ) );
                    } else {
                        my $vps = $#{$d} + 1;
                        print "Invalid rank $rank (0 to $vps)\n";
                    }
                }
                $new = \@ret;
            } else {
                $new = summarise_many($d);
            }

            display_hashes( $new, $conf{"stats-sort-key"},
                $conf{"stats-reverse"} );
            return;
        }

        if ( $#ranks != -1 ) {
            foreach my $rank (@ranks) {
                if ( defined $d->[$rank] ) {
                    print show_name $conf{"stats-name"}, $d->[$rank];
                } else {
                    my $vps = $#{$d} + 1;
                    print "Invalid rank $rank (0 to $vps)\n";
                }
            }
        } else {
            print show_name $conf{"stats-name"}, total($d);
        }
    }

    if ($group) {
        print group_status($d);
    }
}

###############################################################################
#
# Group deadlock detection
#
###############################################################################

sub group_status_helper {
    my $str        = shift;    # tagged onto the end of the line.
    my $possessive = shift;    # syntax to use (possessive/attributive)
    my $size       = shift;    # size of the group
    my @identical  = @_;       # member list
    my $ret;
    my $sstr = defined $size ? " (size $size)" : "";

    my $members = "members";
    my $are     = "are";
    my $have    = "have";

    if ( $#identical == 0 ) {
        $members = "member";
        $are     = "is";
        $have    = "has";
    }

    if ($possessive) {
        $are = $have;
    }

    $ret .=
      sprintf( "Group $members %s$sstr $are $str.\n", compress(@identical) );

    return $ret;
}

sub group_status {
    my $data_structures_aref = shift;

    my %ad;

    my @tg;

    if ( $#target_groups != -1 ) {
        foreach my $gid (@target_groups) {
            $tg[$gid]++;
        }
    }

    # Loop over each vp...
    foreach my $dataset ( @{$data_structures_aref} ) {

        # Loop over each group within the process.
        foreach my $gid ( keys %{ $dataset->{'subsystems'}{'Group'} } ) {

            if ( $#target_groups != -1 ) {
                next unless defined $tg[$gid];
            }

            my $str;

            my $this_group = $dataset->{'subsystems'}{'Group'}{$gid};

            my $ident = $dataset->{vp};

            if ( $this_group->{'statistics'} ) {

                # XXX: Why is this first test here,
                if (    $this_group->{statistics}{Attribute}
                    and $this_group->{statistics}{Attribute}{Self} )
                {
                    $ident = $this_group->{statistics}{Attribute}{Self}{raw}[0];
                    $ad{$gid}{size} =
                      $this_group->{statistics}{Attribute}{Size}{raw}[0];
                    $ad{$gid}{map}[$ident] = $dataset->{vp}
                      if ( $conf{"show-group-members"} );
                }

                $ad{$gid}{idents}{$ident}{'statistics'}++;

                foreach
                  my $tally ( keys( %{ $this_group->{statistics}{Tally} } ) )
                {
                    my $name = $this_group->{statistics}{Tally}{$tally}{'name'};
                    my $number =
                      $this_group->{statistics}{Tally}{$tally}{'raw'}[0];
                    my $active =
                      $this_group->{statistics}{Tally}{$tally}{'raw'}[1];
                    if ( $active != 0 ) {
                        $ad{$gid}{'active'}{$name}++;
                        $ad{$gid}{idents}{$ident}{'active'}{$name} = $number;
                    } else {
                        $ad{$gid}{idents}{$ident}{'inactive'}{$name} = $number;
                    }
                }
            }
            $ad{$gid}{idents}{$ident}{'valid'} = $this_group->{'valid'};
        }
    }

    my $ret = "";
    my $missing_self;
    my $i_count = 0;    # Interesting groups.
    my $d_count = 0;    # Destroyed groups.
    foreach my $gid ( sort { $a <=> $b } keys %ad ) {

        if ( $#target_groups != -1 ) {
            next unless defined $tg[$gid];
        }

        my $gstr = "Information for group '$gid'\n";

        # Maybe show the group members, hope that the user doesn't turn
        # this on unless also setting target_groups!
        if ( $conf{"show-group-members"} ) {
            $gstr .= "group has $ad{$gid}{size} members\n";
            if ( defined $ad{$gid}{size} and $gid != 1 ) {
                for ( my $ident = 0 ; $ident < $ad{$gid}{size} ; $ident++ ) {
                    $gstr .=
                      "group member[$ident] => vp[$ad{$gid}{map}[$ident]]\n";
                }
            }
        }

        my $gone;
        {
            my @invalid;
            foreach my $ident ( sort keys %{ $ad{$gid}{'idents'} } ) {
                if ( $ad{$gid}{'idents'}{$ident}{'valid'} eq 0 ) {
                    push @invalid, $ident;
                }
            }
            if ( $#invalid != -1 ) {
                if ( $conf{"show-all-groups"} ) {
                    $ret .= $gstr
                      . group_status_helper( "showing the group as removed",
                        0, $ad{$gid}{size}, @invalid );
                    $gstr = "";
                }
                if ( $#invalid == ( $ad{$gid}{size} - 1 ) ) {
                    $gone++;
                    $d_count++;
                }
            }
        }
        next if $gone;

        # Find and report groups which don't have statistics
        {
            my @identical;
            foreach my $ident ( sort keys %{ $ad{$gid}{'idents'} } ) {
                push( @identical, $ident )
                  unless ( $ad{$gid}{'idents'}{$ident}{'statistics'} );
            }
            if ( $#identical != -1 ) {
                $missing_self++;
                if ( $conf{"show-all-groups"} ) {
                    $ret .= $gstr
                      . group_status_helper(
                        "no statistics for this group *(1)",
                        1, $ad{$gid}{size}, @identical );
                    $gstr = "";
                } else {
                    $gstr .=
                      group_status_helper( "no statistics for this group *(1)",
                        1, $ad{$gid}{size}, @identical );
                }
            }
        }

        if ( $ad{$gid}{'active'} ) {
            $i_count++;

            # For all collective calls which we are interested in
            foreach my $s ( keys %{ $ad{$gid}{'active'} } ) {
                my %active;
                my %inactive;

                foreach my $ident ( keys %{ $ad{$gid}{'idents'} } ) {
                    if ( defined $ad{$gid}{'idents'}{$ident}{'active'}
                        and $ad{$gid}{'idents'}{$ident}{'active'}{$s} )
                    {
                        my $number = $ad{$gid}{'idents'}{$ident}{'active'}{$s};
                        push( @{ $active{$number} }, $ident );
                    } elsif ( $ad{$gid}{'idents'}{$ident}{'inactive'}{$s} ) {
                        my $number =
                          $ad{$gid}{'idents'}{$ident}{'inactive'}{$s};
                        push( @{ $inactive{$number} }, $ident );
                    }
                }
                foreach my $number ( sort ( keys %active ) ) {
                    $ret .= $gstr
                      . group_status_helper( "in call $number to $s",
                        0, $ad{$gid}{size}, @{ $active{$number} } );
                    $gstr = "";

                }
                foreach my $number ( sort ( keys %inactive ) ) {
                    $ret .= group_status_helper( "completed call $number to $s",
                        1, $ad{$gid}{size}, @{ $inactive{$number} } );
                }
            }
        } else {
            next unless ( $conf{"show-all-groups"} );
        }

        {
            my @inactive;
            foreach my $ident ( sort keys %{ $ad{$gid}{'idents'} } ) {
                if ( $ad{$gid}{'idents'}{$ident}{'statistics'}
                    and not defined $ad{$gid}{'idents'}{$ident}{'active'} )
                {
                    push( @inactive, $ident );
                }
            }
            if ( $#inactive != -1 ) {
                $ret .= $gstr
                  . group_status_helper( "not in a call to the collectives",
                    0, $ad{$gid}{size}, @inactive );
                $gstr = "";
            }
        }
    }

    my $count = keys(%ad);

    if ( $count == 1 ) {
        my $use_str = ( $i_count == 1 ) ? "" : " not";
        $ret .= "Total: $count group which is$use_str in use.\n";
    } else {
        my $d_str = ( $d_count == 1 ) ? "is" : "are";
        my $i_str = ( $i_count == 1 ) ? "is" : "are";
        $ret .=
"Total: $count groups of which $d_count $d_str destroyed and $i_count $i_str in use.\n";
    }

    if ($missing_self) {
        $ret .= "\n(1) Groups that have no statistics are reported by vp\n";
        $ret .= "rather than group id\n";
    }

    return "$ret";
}

###############################################################################
#
# Local (per node) stats.
#
###############################################################################

sub local_stats_from_job {
    my $job = shift;

    printf("Showing local job $job\n");

    my $key = rms_job_to_key($job);

    if ( not defined $key ) {
        printf("Cannot find key for local job $job\n");
        return;
    }

    my @data;
    open( PCMD, "edb -k $key --stats-raw 2>/dev/null|" )
      or die "$prog: cant open file: $!\n";
    local $/ = "\n\n";
    while (<PCMD>) {
        s/\n//g;
        push @data, $_;
    }

    # print Dumper \@data;

    my $s = read_stats(@data);

    $stats_total = 1;

    show_stats($s);
}

# Show stats for all jobs on this node.
sub local_stats {
    opendir( DH, "/proc/rms/programs" );
    my @files = readdir(DH);
    closedir(DH);

    foreach my $job (@files) {
        next if ( $job eq ".." );
        next if ( $job eq "." );

        local_stats_from_job($job);

    }
}

###############################################################################
#
# Stack trace tree compression.
#
###############################################################################

#
# Compare two lists-o-strings
#	\@l1 (IN)	list1
#	\@l2 (IN)	list2
#	RETURN		1 if match, 0 if not
#
sub cmp_list {
    my ( $l1, $l2 ) = @_;

    if ( $#{$l1} != $#{$l2} ) {
        return 0;
    }

    for ( my $i = 0 ; $i <= $#{$l1} ; $i++ ) {
        if ( !defined( ${$l2}[$i] ) || ${$l1}[$i] ne ${$l2}[$i] ) {
            return 0;
        }
    }

    return 1;
}

# This function returns an reference to an array of hashes, each
# hash containing the "txt" of the function name and a further array
# of hash references called "children".
sub go_p {
    my ( $level, $lines, @tags ) = @_;

    my @peers;
    my $prev;
    my $tag = $tags[0];

    printf("called tag:$tag, level:$level tags:@tags\n")
      if $conf{"tree-verbose"};

    return if ( !defined($tag) );
    return if ( !defined( $lines->{$tag} ) );

    my @identical = ();
    my @different = ();

    my $endlevel = $level;

    # Populate the two lists, @identical and @different
    my $line = $lines->{$tag}[$level];
    if ( defined $line ) {
        foreach my $tag2 (@tags) {
            next if ( $tag2 eq $tag );
            if ( defined( $lines->{$tag2}[$level] )
                and $line eq $lines->{$tag2}[$level] )
            {
                push( @identical, $tag2 );
                delete( $lines->{$tag2}[$level] );
            } else {
                push( @different, $tag2 );
            }
        }
    } else {
        foreach my $dtag (@tags) {
            if ( $dtag != $tag ) {
                push( @different, $dtag );
            }
        }

    }

    # Move $endlevel on as far as we can...
    if ( $#identical >= 0 ) {
        my $nextIdentical;
        do {
            $nextIdentical = 0;
            my $nextFound = 0;
            $endlevel++;
            if ( defined $lines->{$tag}[$endlevel] ) {
                foreach my $tag2 (@identical) {
                    if ( defined( $lines->{$tag2}[$endlevel] )
                        and $lines->{$tag}[$endlevel] eq
                        $lines->{$tag2}[$endlevel] )
                    {
                        $nextFound++;
                    }
                }
            }
            if ( ( $#identical + 1 ) == $nextFound ) {
                $nextIdentical = 1;
            }
        } while $nextIdentical;
        $endlevel--;
    } else {
        $endlevel = ( $#{ $lines->{$tag} } );
    }

    printf(
"level $level, endlevel $endlevel, identical:@identical different:@different\n",
    ) if $conf{"tree-verbose"};

    for ( my $l = $level ; $l <= $endlevel ; $l++ ) {

        my %this;
        $this{txt} = $lines->{$tag}[$l];
        @{ $this{vps} } = ( $tag, @identical );
        $this{vpspec} = compress( @identical, $tag );

        if ( defined $prev ) {
            push @{ $prev->{children} }, \%this;
        } else {
            push @peers, \%this;
        }

        $prev = \%this;

    }

    if ( $#identical >= 0 ) {

        if ( $endlevel != $#{ $lines->{$tag} } + 1 ) {
            unshift @identical, $tag;
        }

        $prev->{children} = go_p( $endlevel + 1, $lines, @identical );
    }

    printf(
"returning level:$level endlevel:$endlevel identical:@identical different:@different\n"
    ) if $conf{"tree-verbose"};

    if (@different) {
        my $new = go_p( $level, $lines, @different );
        foreach my $n ( @{$new} ) {
            push @peers, $n;
        }
    }

    return \@peers;
}

# Takes a ref to a array of hashes...
sub _show_tree {

    my ( $ref, $parent, $indent ) = @_;

    my $ret = "";
    my @peers = sort ( { $a->{vps}[0] <=> $b->{vps}[0] } ( @{$ref} ) );

    foreach my $peer (@peers) {

        if ( $#peers != 0 or not defined $parent or $parent ne $peer->{vpspec} )
        {
            my $count   = $#{ $peer->{vps} } + 1;
            my $i_level = "$peer->{vpspec} ($count processes)";
            $ret .= "$indent-----------------\n";
            $ret .= "$indent$i_level\n";
            $ret .= "$indent-----------------\n";
        }

        $ret .= "$indent$peer->{txt}\n";
        if ( defined $peer->{children} ) {
            $ret .=
              _show_tree( $peer->{children}, $peer->{vpspec}, "$indent  " );
        }
    }
    return $ret;
}

sub show_tree {
    my $ref = shift;
    return _show_tree( $ref, undef, "" );
}

# This function is used to process the line tags, it changes fab0 fab1 fab10 into
# fab[0-1,10].
sub compress {
    my %rng  = comp(@_);
    my @list = ();

    # comp returns a hash of arrays, the hash keys are the machines names
    # eg "fab" or "fabi", the arrays have zero or more elements in each one
    # specifies a node-spec.  If there is only one element in the array and
    # it doesn't contain a "-" then don't put square braces around the list
    # contents

    local $" = ",";    # "

    @list = map {
        $_
          . (
            @{ $rng{$_} } > 1 || ${ $rng{$_} }[0] =~ /-/
            ? "[@{$rng{$_}}]"
            : "@{$rng{$_}}"
          )
    } sort keys %rng;

    return wantarray ? @list : "@list";
}

# sortn:
#
# # sort a group of alphanumeric strings by the last group of numerals in
# that string
#
sub sortn {
    map { $$_[0] }
      sort { ( $$a[1] || 0 ) <=> ( $$b[1] || 0 ) } map { [ $_, /(\d*)$/ ] } @_;
}

sub comp {
    my (%i) = ();
    my (%s) = ();

    # turn off warnings here to avoid perl complaints about
    # uninitialized values for members of %i and %s
    local ($^W) = 0;
    push(
        @{
            $s{ $$_[0] }[
              (
                  $s{ $$_[0] }[ $i{ $$_[0] } ]
                    [ $#{ $s{ $$_[0] }[ $i{ $$_[0] } ] } ] == ( $$_[1] - 1 )
              ) ? $i{ $$_[0] } : ++$i{ $$_[0] }
            ]
          },
        ( $$_[1] )
    ) for map { [/(.*?)(\d*)$/] } sortn(@_);

    for my $key ( keys %s ) {
        @{ $s{$key} } =
          map { $#$_ > 0 ? "$$_[0]-$$_[$#$_]" : @{$_} } @{ $s{$key} };
    }

    return %s;
}

###############################################################################
#
# RMS support.
#
###############################################################################

sub find_exe {
    my $name = shift;
    foreach my $dir ( split( ":", $ENV{PATH} ) ) {
        return 1 if ( -x "$dir/$name" );
    }
    return 0;
}

sub rms_is_installed {
    return find_exe("prun");
}

sub rms_get_jobs {
    my $user = shift;
    my @res =
`rmsquery "select jobs.name from jobs,resources where jobs.status=\'running\' and jobs.resource = resources.name and resources.username=\'$user\'"`;
    chomp @res;
    return @res;
}

sub rms_job_is_running {
    my $job    = shift;
    my $status = `rmsquery "select status from jobs where name=\'$job\'"`;
    chomp $status;
    return ( $status eq "running" );
}

sub rms_job_to_key {
    my $job = shift;
    return ( $job << 9 ) - 1;
}

sub rms_setup_pcmd {
    my $job = shift;

    my $res = rms_job_to_resource($job);

    my $ncpus = rms_job_to_ncpus($job);

    if ( $res eq "" ) {
        printf("Job '$job' doesn't have a associated resource\n");
        return undef;
    }

    # Try to prevent zombie jobs, fairly rare but I have seen
    # nodes run different versions of edb which can cause problems
    # XXX: Fixme.  This isn't high enough.
    if ( $conf{"prun-exittimeout"} != 0 ) {
        $ENV{"RMS_EXITTIMEOUT"} = $conf{"prun-exittimeout"};
    }

    if ( $conf{"prun-timeout"} != 0 ) {
        $ENV{"RMS_TIMELIMIT"} = $conf{"prun-timeout"};
    }

    {

        # Work around a couple of bugs in RMS
        # the first one is really old and was there
        # for a while, the second one is limited
        # to 'qsrmslibs-2.82-15'
        my $partition = rms_res_to_partition($res);
        $ENV{"RMS_PARTITION"}  = "$partition";
        $ENV{"RMS_RESOURCEID"} = "$partition.$res";
    }

    my $cmd = "prun -i /dev/null -T $res";

    return ( $cmd, $ncpus );
}

# Not exported...
sub rms_job_to_resource {
    my $job = shift;
    my $res = `rmsquery "select resource from jobs where name=\'$job\'"`;
    chomp $res;
    return $res;
}

sub rms_job_to_ncpus {
    my $job   = shift;
    my $cpus  = `rmsquery "select cpus from jobs where name=\'$job\'"`;
    my $nodes = `rmsquery "select nodes from jobs where name=\'$job\'"`;

    chomp $cpus;
    chomp $nodes;

    my $ncpus = 0;

    my @c = map { $_ =~ /(\d+)-(\d+)/ ? $2 - $1 + 1 : 1 } ( split " ", $cpus );

    my @n = map { $_ =~ /(\d+)-(\d+)/ ? $2 - $1 + 1 : 1 } ( split " ", $nodes );

    for ( my $idx = 0 ; $idx <= $#n ; $idx++ ) {
        $ncpus += $n[$idx] * $c[$idx];
    }

    printf("extracted $ncpus from $cpus and $nodes\n") if $conf{"verbose"} > 1;

    return $ncpus;
}

sub rms_res_to_partition {
    my $res  = shift;
    my $part = `rmsquery "select partition from resources where name=\'$res\'"`;
    chomp $part;
    return $part;
}

###############################################################################
#
# Slurm support.
#
###############################################################################

sub slurm_is_installed {
    return find_exe("srun");
}

sub slurm_get_jobs {
    my $user = shift;
    my @res  = `squeue -t running -u $user -h -o "%i" 2>/dev/null`;
    chomp @res;
    return @res;
}

sub slurm_job_to_ncpus {
    my $job   = shift;
    my @steps = `squeue -s -o "%i %A" 2>/dev/null`;
    return undef if ( $? != 0 );

# The %A option is new so ensure we have the TASKS output before we believe what we see here...
    my $tasks;
    my $have_tasks = 0;
    my $s          = "$job." . $conf{"slurm-job-step"};
    foreach my $step (@steps) {
        my ( $step, $cpus ) = split( " ", $step );
        $tasks      = $cpus if ( $step eq $s );
        $have_tasks = 1     if ( $cpus eq "TASKS" );
    }
    return $tasks if $have_tasks;
    return undef;
}

sub slurm_job_is_running {
    my $job    = shift;
    my $status = lc `squeue -h -j $job -o "%T"`;
    chomp $status;
    return ( $status eq "running" );
}

sub slurm_setup_pcmd {
    my $job  = shift;
    my $cpus = slurm_job_to_ncpus($job);
    return ( "srun --jobid=$job", $cpus );
}

###############################################################################
#
# Local support.
#
###############################################################################

sub local_get_jobs {
    my $user = shift;
    opendir( DIR, "/proc/" );
    my @pids = readdir(DIR);
    closedir(DIR);
    my @jobs;
    my $tuid = getpwnam($user);
    return unless defined $tuid;

    foreach my $pid (@pids) {
        next unless ( $pid =~ /^\d+$/ );

        my (
            $dev,  $ino,   $mode,  $nlink, $uid,     $gid, $rdev,
            $size, $atime, $mtime, $ctime, $blksize, $blocks
        ) = stat("/proc/$pid");

        next unless ( $uid eq $tuid );

        push @jobs, $pid;
    }

    return @jobs;
}

sub local_fd_get_jobs_real {
    my $user = shift;
    my $file = shift;
    opendir( DIR, "/proc/" );
    my @pids = readdir(DIR);
    closedir(DIR);
    my @jobs;
    my $tuid = getpwnam($user);
    return unless defined $tuid;

    foreach my $pid (@pids) {
        next unless ( $pid =~ /^\d+$/ );

        my (
            $dev,  $ino,   $mode,  $nlink, $uid,     $gid, $rdev,
            $size, $atime, $mtime, $ctime, $blksize, $blocks
        ) = stat("/proc/$pid");

        next unless ( $uid eq $tuid );

        opendir( DIR, "/proc/$pid/fd" );
        my @fds = readdir(DIR);
        closedir(DIR);
        foreach my $fd (@fds) {
            my $target = readlink("/proc/$pid/fd/$fd");
            next unless $target;
            if ( $target eq $file ) {
                push @jobs, $pid;
                last;
            }
        }
    }

    return @jobs;
}

sub local_fd_get_jobs {
    my $user = shift;
    return local_fd_get_jobs_real( $user, $conf{"local-fd-name"} );
}

sub local_q_is_installed {
    return ( -d "/proc/qsnet" );
}

sub local_q_get_jobs {
    my $user = shift;
    return local_fd_get_jobs_real( $user, "/proc/qsnet/elan/user" );
}

sub local_job_is_running {
    my $job = shift;
    return ( -d "/proc/$job" );
}

sub local_setup_pcmd {
    return ( "", undef );
}

###############################################################################
#
# mpd support.
#
###############################################################################

sub mpd_is_installed {
    return find_exe("mpdlistjobs");
}

sub mpd_get_data {
    open( MPD, "mpdlistjobs|" ) or return;
    my @out = <MPD>;
    close MPD;
    my %jobs;
    my $job;
    my $host;
    my $pid;
    foreach my $l (@out) {
        my ( $key, $value ) = split( "= ", $l );
        next unless $value;
        $key =~ s/ //g;
        chomp $value;
        if ( $key eq "jobid" ) {
            my ( $j, $host ) = split( "@", $value );
            $job = $j;
        }
        if ( $key eq "username" ) {
            $jobs{$job}{user} = $value;
        }
        if ( $key eq "host" ) {
            $host = $value;
            $jobs{$job}{host}{$value}++;
        }
        if ( $key eq "pid" ) {
            $pid = $value;
            $jobs{$job}{pids}{$host}{$value}++;
        }
        if ( $key eq "rank" ) {
            $jobs{$job}{pids}{$host}{$pid} = $value;
        }
    }
    return \%jobs;
}

# There is a bug here I think, $user isn't used anywhere
# which is probably bad.
sub mpd_get_jobs {
    my $user = shift;

    my $d    = mpd_get_data();
    my @jobs = keys %{$d};
    return @jobs;
}

my $mpd_dfile;

sub mpd_setup_pcmd {
    my $job = shift;

    my $d = mpd_get_data();

    my @hosts = keys %{ $d->{$job}{host} };
    my $i     = @hosts;

    my ( $fh, $fn ) = tempfile("/tmp/padb.XXXXXXXX");
    foreach my $host (@hosts) {
        print $fh "$host:1\n";
    }
    close $fh;

    $mpd_dfile = $fn;

    my $cmd = "mpirun -machinefile $fn -np $i";

    return ( $cmd, undef );
}

sub mpd_cleanup_pcmd {
    unlink($mpd_dfile) if ( defined($mpd_dfile) );
}

###############################################################################
#
# open support.
#
###############################################################################

sub find_ompi_prefix {
    my $name = "ompi-ps";
    foreach my $dir ( split( ":", $ENV{PATH} ) ) {
        next unless ( -x "$dir/$name" );
        my @d = split( "/", $dir );
        pop @d;
        my $prefix = join( "/", @d );
        return "--prefix $prefix";
    }
    return "";
}

sub open_is_installed {
    return find_exe("ompi-ps");
}

my %open_jobs;

sub open_get_data {
    my ($filename) = @_;

    # Simply return if called more than once.
    if ( keys(%open_jobs) != 0 ) {
        return;

    }
    my $hostname = hostname();
    my $job;
    my @out;
    if ( defined $filename ) {
        open( OPEN, $filename ) or return;
        @out = <OPEN>;
        close OPEN;
    } else {
        open( OPEN, "ompi-ps|" ) or return;
        @out = <OPEN>;
        close OPEN;
    }

    # Handle being called multiple times, zero the hash every
    # time we are called.  Of course we could just return the
    # existing hash which might be quicker.
    %open_jobs = ();

    foreach my $l (@out) {
        chomp $l;
        next if ( $l eq "" );

        if ( $l =~ /Information from mpirun \[(\d+)\,0\]/ ) {

            $job = $1;
        } else {
            my @elems = split( /\|/, $l );

            if ( $#elems == 6 ) {

                my $host = $elems[4];
                $host =~ s/ //g;
                $host =~ s/\t//g;
                next if $host eq "Node";
                $open_jobs{$job}{hosts}{$host}++;

                if ( $host eq $hostname ) {
                    my $name = $elems[1];
                    $name =~ /\[\[(\d+)\,(\d+)\]\,(\d+)\]/;
                    my $rank = $3;

                    my $pid = $elems[3];
                    $rank =~ s/ //g;
                    $pid  =~ s/ //g;
                    $open_jobs{$job}{ranks}{$host}{$rank} = $pid;
                }
            }
        }

    }

    if ( $conf{"verbose"} ) {
        print Dumper \%open_jobs;
    }
}

sub open_get_jobs {
    my $user = shift;

    open_get_data();
    return keys %open_jobs;
}

my $open_dfile;
my $open_tfile;

sub open_setup_pcmd {
    my $job = shift;

    open_get_data();

    my ( $th, $tn ) = tempfile(".padb.XXXX");

    open( my $oh, "ompi-ps|" );
    while (<$oh>) {
        print $th $_;
    }
    close $th;
    $cinner{"open-ps"} = $tn;

    $open_tfile = $tn;

    my @hosts = keys %{ $open_jobs{$job}{hosts} };
    my $i     = @hosts;

    my ( $fh, $fn ) = tempfile("/tmp/padb.XXXXXXXX");

    foreach my $host (@hosts) {
        print $fh "$host\n";
    }
    close $fh;

    $open_dfile = $fn;

    my $prefix = find_ompi_prefix();
    my $cmd    = "mpirun -machinefile $fn -np $i $prefix";

    return ( $cmd, undef );
}

sub open_cleanup_pcmd {
    unlink($open_dfile) if ( defined($open_dfile) );
    unlink($open_tfile) if ( defined($open_tfile) );
}

###############################################################################
#
# lsf support.
#
###############################################################################

sub lsf_is_installed {

    # Check for both LSF and RMS, I know LSF works in other ways but I don't
    # know how to launch jobs then...
    my $rms = find_exe("rinfo");
    return 0 unless $rms;
    return find_exe("bjobs");
}

sub lsf_get_jobs {
    my $user = shift;

    my @jobs;

    open( LSF, "bjobs -r -u $user 2>/dev/null|" ) or return;
    my @out = <LSF>;
    close LSF;
    foreach my $l (@out) {
        my ( $job, $user, $stat, $queue, $from, $exec, $name, $time ) =
          split( " ", $l );
        next if ( $job eq "JOBID" );
        next unless ( defined $time );
        push @jobs, $job;
    }

    return @jobs;
}

# This is a little odd, lsf allocates a resource and then pruns (-n1) the users script
# inside that resource.  That script then calls prun which is the real parallel job,
# In essence then you get one resource and (at least) two jobs, padb needs to target
# the second one.  This is controlled by the -Olsf_job_offset option, the default being
# one.
sub lsf_setup_pcmd {
    my $job = shift;

    my $machine = `rinfo -m`;
    chomp $machine;
    my $query =
      "select name,ncpus from resources where batchid=\'$machine\@$job\'";
    my $result = `rmsquery "$query"`;

    my ( $res, $ncpus ) = split( " ", $result );

    open( QUERY,
"rmsquery \"select name from jobs where jobs.resource=\'$res\' and status = \'running\' order by name\"|"
    );
    my @out = <QUERY>;
    close QUERY;

    my $rjob;

    my $idx = $conf{"lsf-job-offset"};
    $idx = 1 if ( $idx > $#out );
    $rjob = $out[$idx];
    chomp $rjob;
    $rem_jobid = $rjob;

    my $cmd = "prun -i /dev/null -T $res";

    return ( $cmd, $ncpus );
}

###############################################################################
#
# Resource manager support.
#
###############################################################################

sub setup_rmgr {
    $conf{"rmgr"} = shift;

    # Now setup the variable for the rest of the program.
    if ( defined $rmgr{ $conf{"rmgr"} }{inner_rmgr} ) {
        $cinner{rmgr} = $rmgr{ $conf{"rmgr"} }{inner_rmgr};
    } else {
        $cinner{rmgr} = $conf{"rmgr"};
    }
}

sub find_rmgr {

# If it's been set on the command line and it's valid then just use what we are given.
# Don't do any sanity checks here to cope with non-default installs.

    if ( defined $rmgr{ $conf{"rmgr"} } ) {
        setup_rmgr( $conf{"rmgr"} );
        return;
    }

    if ( $conf{"rmgr"} ne "auto" ) {
        printf("Error, resource manager \"$conf{rmgr}\" not supported\n");
        exit(1);
    }

    my @ok;
    foreach my $res ( sort( keys %rmgr ) ) {
        next unless defined $rmgr{$res}{is_installed};
        if ( $rmgr{$res}{is_installed}() ) {
            push @ok, $res;
        }
    }
    if ( $#ok != 0 ) {
        printf(
"Error, multiple resource managers detected, use -Ormgr=<resource manager>\n"
        );
        push @ok, "local-fd";
        push @ok, "local";
        printf("@ok\n");
        exit(1);
    }

    setup_rmgr( $ok[0] );
}

# Find any active resource manager, that is --any or --all
# have been passed on the command line so look for any resource
# manager that have active jobs, if there is one active resource
# manager use that one, if there are zero or many exit with an
# error.
sub find_any_rmgr {

# If it's been set on the command line and it's valid then just use what we are given.
# Don't do any sanity checks here to cope with non-default installs.

    if ( defined $rmgr{ $conf{"rmgr"} } ) {
        setup_rmgr( $conf{"rmgr"} );
        return;
    }

    if ( $conf{"rmgr"} ne "auto" ) {
        printf("Error, resource manager \"$conf{rmgr}\" not supported\n");
        exit(1);
    }

    my @installed;
    foreach my $res ( sort( keys %rmgr ) ) {
        next unless defined $rmgr{$res}{is_installed};
        if ( $rmgr{$res}{is_installed}() ) {
            push @installed, $res;

        }
    }

    # One resource manager is installed, good.
    if ( $#installed == 0 ) {
        setup_rmgr( $installed[0] );
        return;
    }

    # No resource managers are installed, bad.
    if ( $#installed == -1 ) {
        printf(
"Error, multiple resource managers detected, use -Ormgr=<resource manager>\n"
        );
        push @installed, "local-fd";
        push @installed, "local";
        printf("@installed\n");
        exit(1);
    }

    my @active;
    foreach my $res (@installed) {
        my @jobs = $rmgr{$res}{get_active_jobs}($user);
        if ( $#jobs != -1 ) {
            push @active, $res;
        }
    }

    # Only one resource manager has active jobs, let's use it.
    if ( $#active == 0 ) {
        setup_rmgr( $active[0] );
        return;
    }

    # Multiple resource managers are installed and have jobs,
    # bouce back to the user to specify which one they want.
    printf(
"Error, multiple active resource managers detected, use -Ormgr=<resource manager>\n"
    );
    push @installed, "local-fd";
    push @installed, "local";
    printf("@installed\n");
    exit(1);
}

sub get_all_jobids {
    my $user = shift;
    return $rmgr{ $conf{"rmgr"} }{get_active_jobs}($user);
}

sub job_is_running {
    my $job = shift;

    if ( defined $rmgr{ $conf{"rmgr"} }{job_is_running} ) {
        return $rmgr{ $conf{"rmgr"} }{job_is_running}($job);
    }

    my @jobs = $rmgr{ $conf{"rmgr"} }{get_active_jobs}($user);
    my %j;
    map { $j{$_}++ } @jobs;
    return defined $j{$job};
}

sub job_to_key {
    my $job = shift;

    if ( defined $rmgr{ $conf{"rmgr"} }{job_to_key} ) {
        return $rmgr{ $conf{"rmgr"} }{job_to_key}($job);
    }

    return undef;
}

sub setup_pcmd {
    my $job = shift;
    return $rmgr{ $conf{"rmgr"} }{setup_pcmd}($job);
}

sub cleanup_pcmd {
    my $job = shift;
    if ( defined( $rmgr{ $conf{"rmgr"} }{cleanup_pcmd} ) ) {
        $rmgr{ $conf{"rmgr"} }{cleanup_pcmd}();
    }
}

###############################################################################
#
# Output formatting
#
###############################################################################

sub strip_stack_traces {
    my $lines = shift;

    my %above;
    my %below;

    map { $above{$_}++ } split( ",", $conf{"stack-strip-above"} );
    map { $below{$_}++ } split( ",", $conf{"stack-strip-below"} );

    foreach my $tag ( keys %$lines ) {

        # There was a subtle bug here, functions from the @above_list
        # often appear below main which this code doesn't handle all that
        # well.
        my $main_idx;
        my $wait_idx = 0;
        for ( my $l = 0 ; $l < $#{ $lines->{$tag} } ; $l++ ) {
            if ( $lines->{$tag}->[$l] =~ /(\w*)\(/ ) {
                if ( defined $below{$1} ) {
                    $main_idx = $l;
                }
                if ( defined $above{$1} ) {
                    if ( defined($main_idx) ) {
                        $wait_idx = $l;
                        last;
                    }
                }
            }
        }
        $main_idx = 0 if not defined $main_idx;
        if ( $main_idx != 0 or $wait_idx != 0 ) {
            my $end =
              ( $strip_above_wait and $wait_idx )
              ? $wait_idx
              : $#{ $lines->{$tag} };
            my $start = ( $strip_below_main and $main_idx ) ? $main_idx : 0;

            printf( "Stripping 0.."
                  . $#{ $lines->{$tag} }
                  . " to $start..$end for $tag\n" )
              if $conf{"verbose"} > 1;

            my @new = @{ $lines->{$tag} };
            @new = @new[ $start .. $end ];
            $lines->{$tag} = \@new;
        }
    }
}

sub sort_proc_hashes {
    my $key = shift;
    my @all = @_;

    #print Dumper $all;
    return ( reverse( sort { $a->{$key} <=> $b->{$key} } @all ) );
}

sub pre_mpi_watch {
    my ($cpus) = @_;
    my $header = <<EOF;
u: unexpected messages U: unexpected and other messages
s: sending messages r: receiving messages m: sending and receiving
b: Barrier B: Broadcast g: Gather G: AllGather r: reduce: R: AllReduce
a: alltoall A: alltoalls w: waiting
.: consuming CPU cycles ,: using CPU but no queue data -: sleeping *: error
EOF
    printf($header);
    my %data;
    $data{cpus} = $cpus;
    return \%data;
}

sub show_mpi_watch {
    my ( $handle, $lines ) = @_;

    #     print Dumper $lines;
    my $s = "";
    foreach my $l ( sort { $a <=> $b } ( keys %{ $lines->{raw} } ) ) {
        $s .= $lines->{raw}{$l}{state};
    }
    print("$s\n");
}

# Nicely format process information.
# XXX: proc-sort-key should probably sort on column headers as
# well as keys.
sub show_proc_format {
    my ( $nlines, $mode, $handle ) = @_;

    my $lines = $nlines->{lines};

    my @proc_format_array;
    my %proc_format_header;
    my $show_fields = 0;

    my %proc_format_lengths;

    my $separator = $conf{"column-seperator"};

    my @columns = split( ",", $proc_format );
    foreach my $column (@columns) {

        $show_fields = 1 if ( $column eq "fields" );

        my ( $name, $desc ) = split( "=", $column );
        if ( defined $desc ) {
            push @proc_format_array, lc($name);
            $proc_format_header{ lc($name) }  = $desc;
            $proc_format_lengths{ lc($name) } = length($desc);
        } else {
            push @proc_format_array, lc($column);
            $proc_format_header{ lc($column) }  = $column;
            $proc_format_lengths{ lc($column) } = length($column);
        }
    }

    my @all;
    foreach my $tag ( sort ( keys %$lines ) ) {
        my %hash;
        $hash{vp} = $tag;
        foreach my $data ( @{ $lines->{$tag} } ) {
            if ( $data =~ /([\w\.]+)\:[ \t]*(.+)/ ) {
                my $key = lc($1);

                next unless defined $proc_format_lengths{$key} or $show_fields;

                if ( length($2) > $proc_format_lengths{$key} ) {
                    $proc_format_lengths{$key} = length($2);
                }

                $hash{$key} = $2;
            }
        }
        if ($show_fields) {
            my @fields = sort ( keys(%hash) );
            print "@fields\n";
            exit(0);
        }
        push @all, \%hash;
    }

    @all = sort_proc_hashes( $conf{"proc-sort-key"}, @all );

    if ( $conf{"proc-show-header"} ) {
        my @res;
        foreach my $key (@proc_format_array) {
            my $l .= sprintf( "%-$proc_format_lengths{$key}s",
                $proc_format_header{$key} );
            push @res, $l;
        }
        my $line = join( $separator, @res );
        print "$line\n";

        #print "@proc_format_array\n";
    }
    foreach my $hash (@all) {
        my @res;
        my @res;
        foreach my $key (@proc_format_array) {
            my $value = "??";
            if ( defined $hash->{$key} ) {
                $value = $hash->{$key};
            }
            push @res, sprintf( "%$proc_format_lengths{$key}s", $value );
        }
        my $line = join( $separator, @res );
        print "$line\n";
    }

}

sub show_results {
    my ( $nlines, $mode, $handle ) = @_;

    printf Dumper $nlines if $conf{"dump-raw"};

    my $lines = $nlines->{lines};

    if ( defined $allfns{$mode}{out_handler} ) {
        $allfns{$mode}{out_handler}( $handle, $nlines );
        return;
    }

    if ( $mode eq "stack" or $input_file ) {
        if ( $strip_below_main or $strip_above_wait ) {
            strip_stack_traces($lines);
        }
    }

    if ($tree) {
        print show_tree go_p( 0, $lines,
            ( sort { $a <=> $b } ( keys %$lines ) ) );
    } elsif ($compress) {
        foreach my $tag ( sort { $a <=> $b } ( keys %$lines ) ) {
            next if ( !defined( $lines->{$tag} ) );
            my @identical = ();
            foreach my $tag2 ( keys %$lines ) {
                next if ( $tag2 eq $tag );
                if ( cmp_list( \@{ $lines->{$tag} }, \@{ $lines->{$tag2} } ) ) {
                    push( @identical, $tag2 );
                    delete( $lines->{$tag2} );
                }
            }
            print("----------------\n");
            printf( "%s\n", join( ",", compress( @identical, $tag ) ) );
            print("----------------\n");
            foreach my $data ( @{ $lines->{$tag} } ) {
                print("$data\n");
            }
        }
    } elsif ($compress_C) {
        foreach my $tag ( sort { $a <=> $b } ( keys %$lines ) ) {
            print("----------------\n");
            print("$tag\n");
            print("----------------\n");
            foreach my $data ( @{ $lines->{$tag} } ) {
                print("$data\n");
            }
        }
    } elsif ( $mode eq "proc-summary" ) {
        show_proc_format( $nlines, $mode, $handle );
    }
}

###############################################################################
#
# Data collection (parallel and from file).
#
###############################################################################

sub process_line {
    my ( $line, $lines ) = @_;

    if ( $line =~ / *([a-zA-Z]*)\.?([-\d]+):([^\n]+)\n/ ) {
        if ( not $1 ) {
            my $key   = $2;
            my $value = $3;
            if ( $value =~ /raw\:([A-Za-z0-9\+\/\=]*)/ ) {
                push( @{ $lines->{base64}{$key} }, $1 );
            } else {
                push( @{ $lines->{lines}{$key} }, $value );
            }
        } else {
            printf("debug $1.$2: $3\n");
        }
    } else {
        printf("malformed line: $line");
    }
}

sub post_process_lines {
    my $lines = shift;
    return unless exists( $lines->{base64} );
    foreach my $tag ( keys %{ $lines->{base64} } ) {
        $lines->{raw}{$tag} =
          thaw( decode_base64( join( "\n", @{ $lines->{base64}{$tag} } ) ) );
    }
}

sub go_file {
    my $file = shift;
    my $mode = shift;

    if ( $stats_total or $group ) {
        my @data;

        open( PCMD, "$file" ) or die "$prog: cant open file $file: $!\n";
        local $/ = "\n\n";
        while (<PCMD>) {
            s/\n//g;
            push @data, $_;
        }
        my $s = read_stats(@data);

        show_stats($s);

        return;
    }

    if ( not $line_formatted ) {
        die("input file specified but no formatting selected\n");
    }

    open( PCMD, "$file" ) or die "$prog: cant open file $file: $!\n";
    my @data = <PCMD>;
    close(PCMD);

    my %lines;    # A hash of arrays.

    foreach my $line (@data) {
        process_line( $line, \%lines );
    }
    post_process_lines( \%lines );
    show_results( \%lines, $mode, undef );
}

sub rc_status {
    my $status = shift;
    my %rc;

    $rc{'rc'}     = $status >> 8;
    $rc{'core'}   = ( $status & 128 ) >> 7;
    $rc{'signal'} = $status & 127;

    return %rc;
}

sub maybe_clear_screen {
    if ( $conf{"watch-clears-screen"} ) {
        printf( "%s", " \033[1;1H" );
        printf( "%s", "\033[2J" );
    }
}

sub go_job_once {
    my $jobid = shift;
    my $cmd   = shift;
    my $ncpus = shift;
    my $raw   = shift;
    my $stats = shift;
    my $mode  = shift;
    my $h     = shift;

    my $errors = 0;

    my $report_errors = 1;

    $report_errors = 0 if ($full_report);

    my $pcmd = {
        pid => -1,
        in  => "",
        out => *OUT,
        err => *ERR,
    };

    # According to the docs there is potential for deadlock here
    # if the amount of data coming in is enough to fill the buffers
    # We should really use IO::select it's not clear to me how
    # you detect EOF in that case and this works for now.

    $pcmd->{pid} = open3( $pcmd->{in}, *OUT, *ERR, $cmd )
      or die "Unable to open3() pcmd: $!\n";

    close $pcmd->{in};

    {
        my %lines;
        my @data;

        if ($raw) {
            my $handle = $pcmd->{out};
            while (<$handle>) {
                my $line = $_;
                print("$line");
            }
        } else {
            if ($stats) {
                local $/ = "\n\n";
                my $handle = $pcmd->{out};
                while (<$handle>) {
                    s/\n//g;
                    push @data, $_;
                }
            } elsif ($line_formatted) {
                my $handle = $pcmd->{out};
                while (<$handle>) {
                    my $line = $_;
                    process_line( $line, \%lines );
                }
                post_process_lines( \%lines );
            }
        }

        my $handle = $pcmd->{err};
        while (<$handle>) {
            my $line = $_;
            if ($report_errors) {
                print( STDERR "Error ($jobid,$mode): $line" );
            }
            $errors++;
        }

        close $pcmd->{in};
        close $pcmd->{out};
        close $pcmd->{err};

        waitpid( $pcmd->{pid}, 0 );
        my $res = $?;

        printf("result from parallel command was $res\n")
          if ( $conf{"verbose"} );

        if ( $res != 0 ) {
            my %status = rc_status($res);
            if ( job_is_running($jobid) ) {
                if ($report_errors) {
                    printf(
                        "Failed to run parallel command (rc = $status{rc})\n");
                }
            } else {
                printf("Job $jobid is no longer active\n");
                return 1;
            }
        }

        if ($stats) {
            if ( $conf{"stats-raw"} ) {
                local $, = "\n\n";
                print @data;
                print "\n";
            } else {
                my $s = read_stats(@data);
                show_stats($s);
            }
        } elsif ($line_formatted) {
            if ( defined $ncpus ) {
                for ( my $vp = 0 ; $vp < $ncpus ; $vp++ ) {
                    push(
                        @{ $lines{lines}{$vp} },
                        "no output for this process"
                    ) if ( not defined $lines{lines}{$vp}[0] );
                }
            }
            show_results( \%lines, $mode, $h );
        }
    }

    return $errors;

}

sub go_job {
    my $jobid = shift;
    my $mode  = shift;
    my $rops  = "";

    if ( defined $mode ) {
        $rops .= " --$allfns{$mode}{arg_long}";

        if ( defined $allfns{$mode}{secondary} ) {
            foreach my $sec ( @{ $allfns{$mode}{secondary} } ) {
                $rops .= " --$sec->{arg_long}=$sec->{value}";
            }
        }
    }

    my $key = job_to_key($jobid);

    my $cmd;
    my $ncpus;

    my $stats;

    foreach my $rank (@ranks) {
        $rops .= " --rank=$rank";
    }

    $conf{"verbose"} && print "Attaching to job $jobid\n";

    $rem_jobid = $jobid;

    # Setup whatever is needed for running parallel commands, note this might
    # involve setting environment variables.
    my @res = setup_pcmd($jobid);

    return 1 unless (@res);

    $cmd   = $res[0];
    $ncpus = $res[1];

    $conf{"verbose"} && defined $ncpus && print "Job has $ncpus cpus\n";

    # Some versions of perl like to have a space after the O and report that
    # -ormgr isn't a valid option if it's not there, perhaps this is a bug
    # in GetOptions but for now just work around it.
    foreach my $opt ( keys %cinner ) {
        $rops .= " -O $opt=\"$cinner{$opt}\"";
    }

  # Maybe do it this way, edb works best when run with the same LD_LIBRARY_PATH
  # as the application.  It's very important when running the message queue
  # extraction code but less so here.  You may find you get linker errors though
  # although they shouldn't be to hard to work around.

    # Another problem, if using slurm then the key isn't valid, you need to
    # convert from jobId to key locally on the node, hence you need to use
    # a padb-helper process
    if ( $stats_total or $group ) {
        $stats = 1;
        if ( defined $key ) {
            $cmd .=
              " $conf{edb} --stats-raw --parallel --key=$key $conf{edbopt}";
        } else {
            $cmd .=
              " $0 --inner --jobid=$rem_jobid $rops --stats-full $conf{edbopt}";
        }
    } else {
        $rops .= " --line-formatted" if ( $line_formatted or $#ranks != 0 );
        $cmd .= " $0 --inner --jobid=$rem_jobid" . $rops;
    }

    ( $conf{"verbose"} > 1 or $conf{"showcmd"} ) && print "$cmd\n";

    my $raw = ( ( not $stats ) and ( not $line_formatted ) );

    my $h;
    if ( defined $allfns{$mode}{pre_out_handler} ) {
        $h = $allfns{$mode}{pre_out_handler}($ncpus);
    }

    # This makes thing easier...
    if ($watch) {
        while (1) {
            maybe_clear_screen();
            my $errors =
              go_job_once( $jobid, $cmd, $ncpus, $raw, $stats, $mode, $h );
            if ( $errors != 0 ) {
                cleanup_pcmd();
                return $errors;
            }
            sleep( $conf{"interval"} );
        }
    }
    my $errors = go_job_once( $jobid, $cmd, $ncpus, $raw, $stats, $mode, $h );
    cleanup_pcmd();
    return $errors;
}

###############################################################################
#
# Outer main
#
###############################################################################

sub cmdline_error {
    my $str = shift;
    print STDERR $str;
    exit(1);
}

sub config_init {
    map { $ic_names{$_}++ } @inner_conf;
}

sub config_set {
    my ( $key, $value ) = @_;
    printf("Setting '$key' to '$value'\n") if ( $conf{"verbose"} );

    if ( not exists $conf{$key} ) {
        printf( STDERR
              "Warning, unknown config option '$key' value '$value'.\n" );
    }

    $conf{$key} = $value;

    if ( defined $ic_names{$key} ) {
        $cinner{$key} = $value;
    }
}

sub config_from_file {
    my $file = shift;

    printf("Loading config from \"$file\"\n") if ( $conf{"verbose"} );
    open( CFILE, $file ) or return;

    while (<CFILE>) {
        if (/^([\w-]+)\s*\=\s*(.*)/) {
            my $key   = $1;
            my $value = $2;
            config_set( $key, $value );
        }
    }
    close(CFILE);
    return;
}

sub config_from_env {
    printf("Loading config from environment\n") if ( $conf{"verbose"} );

    foreach my $key ( keys(%conf) ) {
        my $name = uc($key);
        $name =~ s/\-/\_/g;
        if ( defined $ENV{"PADB_$name"} ) {
            config_set( $key, $ENV{"PADB_$name"} );
        }
    }
}

sub config_help {
    printf("Current options are:\n");

    my $max_len = 0;

    foreach my $key ( keys(%conf) ) {
        if ( length $key > $max_len ) {
            $max_len = length $key;
        }
    }

    foreach my $key ( sort( keys(%conf) ) ) {
        my $name = $key;
        $name =~ s/\_/\-/g;
        if ( defined $conf{$key} ) {
            printf( " %$max_len" . "s = '$conf{$key}'\n", $name );
        } else {
            printf( " %$max_len" . "s = unset\n", $name );
        }
    }
}

sub outer_main {

    my $mode = parse_args_outer();

    if ( getpwnam($user) eq "" ) {
        print STDERR "$prog: Error: no such user as '$user'\n";
        exit 1;
    }

  # Load from the config files first, then the env and finally the command line.

    config_init();

    config_from_file($configfile);

    config_from_file("$ENV{HOME}/.padbrc") unless ( $norc == 1 );

    config_from_env();

    printf("Loading config from command line\n") if ( $conf{"verbose"} );

    #
    # Once again there is a 'bugette' here, you cant pass the
    # first of these strings through due to the split hacking
    # off everything to the right of the second equals sign
    # however you can do the second.
    # -Oedbopt="--pagesize=8192 --pagesize-header=4096"
    # -Oedbopt="--pagesize 8192 --pagesize-header 4096"
    foreach my $config_option (@config_options) {
        my @pairs = split( ",", $config_option );

        foreach my $pair (@pairs) {
            my ( $name, $val ) = split( "=", $pair );

            # $name =~ s/\-/\_/g;

            if ( $name eq "scriptDir" ) {
                printf(
"$prog: -OscriptDir deprecated, use -Oedb=/path/to/edb instead\n"
                );
                exit(1);
            }

            if ( !exists $conf{$name} ) {
                printf("Error, unknown config option '$name'\n");
                config_help();
                exit(1);
            }
            if ( !defined $val ) {
                printf("Error, config option '$name' requires value\n");
                config_help();
                exit(1);
            }
            config_set( $name, $val );
        }
    }

    if ($list_rmgrs) {
        foreach my $res ( sort( keys %rmgr ) ) {
            my $working = "yes";

            if ( defined $rmgr{$res}{is_installed}
                and not $rmgr{$res}{is_installed}() )
            {
                $working = "no";
            }
            my $r = $res;

            if ( $working eq "yes" ) {
                printf("$r: ");
                my @jobs = $rmgr{$res}{get_active_jobs}($user);
                if ( $#jobs > -1 ) {
                    my $j = join( " ", sortn(@jobs) );
                    printf("jobs($j)\n");
                } else {
                    printf("No active jobs\n");
                }
            } else {
                printf("$r: not active\n");
            }
        }
        exit(0);
    }

    if ($core_stack) {
        if ( not defined $core_name or not defined $exe_name ) {
            printf(
                "Usage $0 --core-stack --core=<corefile> --exe=<executable>\n");
            exit(1);
        }
        if ( not -f $exe_name ) {
            printf("Error: executable file '$exe_name' does not exist!\n");
            exit(1);
        }
        if ( not -f $core_name ) {
            printf("Error: core file '$core_name' does not exist!\n");
            exit(1);
        }
        stack_from_core( $exe_name, $core_name );
        exit(0);
    }

    if ($full_report) {

        find_rmgr();

        if ( not job_is_running($full_report) ) {
            printf( STDERR
"Job $full_report is not active, use --show-jobs to see active jobs\n"
            );
            exit(1);
        }

        printf("padb version $version\n");
        printf("full job report for job $full_report\n\n");

        # Bit of a cheat here, do two things at once...
        # This should probably me modified to work better on
        # non Quadrics systems.
        my $res;
        $stats_total = 1;
        $group       = 1;
        $res         = go_job( $full_report, "full-report" );
        undef $stats_total;
        undef $group;

        # Don't exit on failure here.
        #if ( $res != 0 ) {
        #    exit 1;
        #}

        printf("\n");

        $line_formatted = 1;

        $compress = 1;
        go_job( $full_report, "queue" );
        undef $compress;

        printf("\n");

        $strip_above_wait = 0;
        $countoutput      = 1;
        $tree             = 1;
        go_job( $full_report, "stack" );
        undef $tree;

        exit 0;
    }

    if ($show_jobs) {
        find_rmgr();
        my @jobids = get_all_jobids($user);
        print("@jobids\n");
        exit(0);
    }

    if ($local_stats) {

        if ($watch) {
            while (1) {
                maybe_clear_screen();
                local_stats();
                sleep( $conf{"interval"} );
            }
        } else {
            local_stats();
        }
        exit(0);
    }

    if ( $all or $any ) {
        if ( $#ARGV ne "-1" ) {
            cmdline_error(
                "$prog: Error: --all incompatible with specific ids\n");
        }
    } elsif ( !$input_file ) {
        if ( $#ARGV eq "-1" ) {
            cmdline_error(
                "$prog: Error: no jobs specified, use --all or jobids\n");
        }
    }

    if ( ( grep { $_ } ( $any, $all, $input_file ) ) > 1 ) {
        cmdline_error(
            "$prog: Error: only specify one of --all, --any or --input_file,\n"
        );
    }

    $line_formatted = ( grep { $_ } ( $compress, $compress_C, $tree ) );
    if ( $line_formatted > 1 ) {
        cmdline_error(
"$prog: Error: only specify one of --compress, --compress-long or --tree\n"
        );
    }

    if ( defined $mode && $mode eq "proc-summary" ) {
        $line_formatted = 1;
    }

    if ( defined $mode && defined $allfns{$mode}{out_handler} ) {
        $line_formatted = 1;
    }

    if ( not $input_file
        and
        ( ( grep { $_ } ( $stats_total, $group, $have_allfns_option ) ) != 1 )
        or ( $have_allfns_option > 1 ) )
    {
        cmdline_error(
"$prog: Error: you must specify only one of -x, -S, -s, -g, -q, -X or --kill\n"
        );
    }

    # If delivering a signal check that it's valid.
    if ( defined($mode) and ( $mode eq "kill" ) ) {
        my $signal = uc( $secondary_args{signal} );
        my %sig_names;
        map { $sig_names{$_} = 1 } split( " ", $Config{"sig_name"} );

        if ( not defined $sig_names{$signal} ) {
            cmdline_error("$prog: Error: signal $signal is invalid\n");
        }
    }

    if ( $tree and !( ( defined $mode && $mode eq "stack" ) or $input_file ) ) {
        cmdline_error("$prog: Error: --tree only works with --stack-trace\n");
    }

    if ( ( ( grep { $_ } ($stats_total) ) == 1 )
        and $line_formatted )
    {
        cmdline_error(
"$prog: Error: requested output not compatible with requested formatting\n"
        );
    }

    $countoutput = 1
      if ( ( defined $mode and $mode eq "stack" ) or $conf{"verbose"} );

    if ( defined($input_file) ) {
        my $m = "input";
        if ( defined $mode ) {
            $m = $mode;
        }
        go_file( $input_file, $m );
        exit(0);
    }

    my @jobids;

    if ( $any or $all ) {

        find_any_rmgr();

        @jobids = get_all_jobids($user);
        printf( "Active jobs (%d) are @jobids\n", $#jobids + 1 )
          if $conf{"verbose"};
        if ( $#jobids == -1 ) {
            printf("No active jobs could be found for user '$user'\n");
            exit 1;
        }
        if ( $any && $#jobids != 0 ) {
            printf("More than 1 active job (@jobids) for user '$user'\n");
            exit 1;
        }
    } else {
        find_rmgr();

        foreach my $jobid (@ARGV) {
            if ( job_is_running($jobid) ) {
                push @jobids, $jobid;
            } else {
                printf( STDERR "Job $jobid is not active\n" );
            }
        }
    }

    if ( $#jobids > 0 and $watch ) {
        printf("Cannot use --watch with more than one job\n");
        exit(1);
    }

    foreach my $jobid (@jobids) {

        printf "\nCollecting information for job '$jobid'\n\n"
          if ( $conf{"verbose"} or ( $#jobids > 0 ) );

        go_job( $jobid, $mode );
    }
}

###############################################################################
#
# Inner.
#
###############################################################################

# The code below here used to be in a separate script (padb-helper.pl) but
# it's become apparent that for ease-of-distribution padb works better if it
# is self-contained in one file.  Now we just have a big switch on ARGV[0]
# and either run the inner or outer code depending on if it's set or not.

my %confInner;

sub debug {
    my ( $vp, $str ) = @_;
    $confInner{"verbose"} or return;
    $vp = -1 unless defined $vp;
    print "$confInner{hostname}.$vp:$str\n";

}

sub output_dtype {
    my ( $vp, $ref ) = @_;
    if ( defined $vp ) {
        my $p = nfreeze $ref;
        my $q = encode_base64($p);
        foreach my $l ( split( "\n", $q ) ) {
            print "$vp:raw:$l\n";
        }
    } else {
        my $str = Dumper $ref;
        print "$confInner{hostname}.-1:ERROR: $str\n";
    }
}

sub output {
    my ( $vp, $str ) = @_;
    if ( $confInner{"lineformatted"} ) {
        if ( defined $vp ) {
            print "$vp:$str\n";
        } else {
            print "$confInner{hostname}.-1:ERROR: $str\n";
        }
    } else {
        print "$str\n";
    }
}

sub p_die {
    my ( $vp, $str ) = @_;
    $confInner{"verbose"}++;
    debug( $vp, "$str, '$@'" );
    exit(1);
}

sub is_parent_resmgr {
    my $pid = shift;
    my $parent_pid = find_from_status( $pid, "PPid" );
    return is_resmgr_process($parent_pid);
}

$SIG{PIPE} = 'IGNORE';

sub gdb_start {
    my ( $exe, $core ) = @_;
    my $gdb = {
        gdbpid   => -1,
        tracepid => -1,
        attached => 0,
        rdr      => "",
        wtr      => "",
        err      => "",
    };

    my $cmd = "gdb --interpreter=mi -q";
    if ( defined $core ) {
        $cmd .= " $exe $core";
    }

    $gdb->{gdbpid} = open3( $gdb->{wtr}, $gdb->{rdr}, $gdb->{err}, $cmd )
      or die "Unable to popen() gdb: $!\n";

    return $gdb;
}

sub gdb_quit {
    my ($gdb) = @_;
    my $result = gdb_send( $gdb, "quit" );
    waitpid( $gdb->{gdbpid}, 0 );
    close( $gdb->{rdr} );
    close( $gdb->{wtr} );
    close( $gdb->{err} );
    return;
}

sub gdb_attach {
    my ( $gdb, $pid ) = @_;

    my $result = gdb_send( $gdb, "attach $pid" );

    return if ( $result eq "error" );

    $gdb->{attached} = 1;
    $gdb->{tracepid} = $pid;

    return $pid;
}

sub gdb_detach {
    my ($gdb) = @_;
    my $result = gdb_send( $gdb, "-target-detach" );

    return if ( $result eq "error" );

    $gdb->{attached} = 0;

    return $gdb->{tracepid};
}

sub gdb_wait_for_prompt {
    my ($gdb) = shift;
    my $handle = $gdb->{rdr};
    while (<$handle>) {
        return if /^\(gdb\)/;
    }

    return;
}

sub gdb_n_send {
    my ( $gdb, $cmd ) = @_;
    gdb_wait_for_prompt($gdb);
    my $handle = $gdb->{wtr};
    print $handle "$cmd\n";
    my %r = gdb_n_next_result($gdb);
    $r{cmd} = $cmd;
    return %r;
}

sub gdb_send {
    my ( $gdb, $cmd ) = @_;
    my %p = gdb_n_send( $gdb, $cmd );
    return $p{status};
}

sub strip_square {
    my $str = shift;
    $str =~ /^\[(.*)\]$/;
    return $1;
}

sub strip_soft {
    my $str = shift;
    $str =~ /^\{(.*)\}$/;
    return $1;
}

sub strip_quotes {
    my $str = shift;
    $str =~ /^\"(.*)\"$/;    #"
    return $1;
}

sub strip_leading_comma {
    my $str = shift;
    $str =~ /^,(.*)$/;
    return $1;
}

sub strip_first_quotes {
    my $str = shift;
    $str =~ s/\\\"/REALLYBAD/g;      #"
    $str =~ /^\"([^\"]*)\"(.*)$/;    #"
    my $val      = $1;
    my $leftover = $2;

    $val      =~ s/REALLYBAD/\"/g;      #"
    $leftover =~ s/REALLYBAD/\\\"/g;    #"
    return ( $val, $leftover );

}

# Has to return key (str) value (complex) extra(string)
sub extract_value_square {
    my $str = shift;

    my $left   = "";
    my $right  = $str;
    my $indent = 0;

    while ( $right =~ /^([^\[\]]*)([\[\]])(.*)$/ ) {
        if ( $2 eq "[" ) {
            $indent++;
            $left  = "$left$1\[";
            $right = $3;
        } else {
            $indent--;
            $left  = "$left$1\]";
            $right = $3;
            if ( $indent == 0 ) {
                return ( strip_square($left), $right );
            }
        }

        # printf("$2 $indent\nleft  '$left'\nright '$right'\n\n\n\n");
    }
    printf("ident $indent\n");
}

sub extract_value_soft {
    my $str = shift;

    my $left   = "";
    my $right  = $str;
    my $indent = 0;

    while ( $right =~ /^([^\{\}]*)([\{\}])(.*)$/ ) {
        if ( $2 eq "{" ) {
            $indent++;
            $left  = "$left$1\{";
            $right = $3;
        } else {
            $indent--;
            $left  = "$left$1\}";
            $right = $3;
            if ( $indent == 0 ) {
                return ( strip_soft($left), $right );
            }
        }

        # printf("$2 $indent\nleft  '$left'\nright '$right'\n\n\n\n");
    }
    printf("ident $indent\n");
}

sub new_parse {
    my $str      = shift;
    my $collapse = shift;

    # printf("Parsing\t\t\t\t\t\t$str\n");

    my %res;
    my $key;
    my $value;

    if ( $str =~ /^([\w\-\?]+)\=(.*)$/ ) {
        $key   = $1;
        $value = $2;
    } else {
        $key   = "tuple";
        $value = $str;
    }
    my $leftover;

    # printf("Got key/value pair! $key\n");
    my $type = substr( $value, 0, 1 );
    if ( $type eq "[" ) {
        if ( $value eq "[]" ) {
            my @e;
            return ( $key, \@e, "" );
        }
        my ( $l, $r ) = extract_value_square($value);
        $leftover = $r;

        # printf("Got value\n$l\n$r\n");

        my @b;
        while ( $l ne "" ) {
            my ( $kk, $vv, $c ) = new_parse( $l, $collapse );

            # Assert that $c is empty?
            $l = "";
            if ( $c ne "" ) {
                $c = strip_leading_comma($c);
                $l = $c;
            }
            my %q;
            if ( $kk eq "tuple" or defined $collapse and $kk eq $collapse ) {
                push @b, $vv;
            } else {
                $q{$kk} = $vv;

                # push @b,$vv;
                push @b, \%q;
            }
        }
        return ( $key, \@b, $r );
    } elsif ( $type eq "{" ) {
        my ( $l, $r ) = extract_value_soft($value);
        $leftover = $r;

        # printf("Got value\n$l\n$r\n");

        my @all;
        while ( $l ne "" ) {
            my ( $kk, $vv, $c ) = new_parse( $l, $collapse );

            $l = "";
            if ( $c ne "" ) {
                $c = strip_leading_comma($c);
                $l = $c;
            }
            if ( defined $collapse and $key eq "thread-ids" ) {
                my %r;
                $r{$kk} = $vv;
                push @all, \%r;
            } else {
                $res{$kk} = $vv;
            }
        }
        if ( defined $collapse and $key eq "thread-ids" ) {
            return ( $key, \@all, $r );
        } else {
            return ( $key, \%res, $r );
        }
    } elsif ( $type eq "\"" ) {
        my ( $this, $l ) = strip_first_quotes($value);
        return ( $key, $this, $l );
    } else {
        die("unknown type '$type' str '$str'");
    }

    return ( $key, \%res, $leftover );

}

sub gdb_parse_reason {
    my $str      = shift;
    my $collapse = shift;

    my $leftover = $str;
    my %res;
    while ( $leftover ne "" ) {
        my ( $key, $value, $l ) = new_parse( $leftover, $collapse );
        $leftover = "";
        if ( $l ne "" ) {
            $leftover = strip_leading_comma($l);
        }
        $res{$key} = $value;
    }
    return \%res;
}
#########################################################################

sub gdb_n_next_result {
    my ($gdb) = shift;
    my $handle = $gdb->{rdr};

    my %res;

    while (<$handle>) {
        return %res if /^\(gdb\)/;
        if (/\~\"(.*)\"\n/) {    #"
            $res{raw} .= $1;
        }
        if (/\&\"(.*)\"\n/) {    #"
            $res{debug} .= $1;
        }
        if (/^\^(done|error),?(.*)$/) {
            $res{status} = $1;
            if ( defined $2 and $2 ne "" ) {
                $res{reason} = $2;

                # $current_parsed = $2;
            }
            if ( defined $res{raw} ) {
                $res{raw} =~ s/\\n/\n/g;
                chomp $res{raw};
            }
            if ( defined $res{debug} ) {
                $res{debug} =~ s/\\n/\n/g;
                chomp $res{debug};
            }
            return %res;
        }
    }
    if ( defined $res{raw} ) {
        $res{raw} =~ s/\\n/\n/g;
        chomp $res{raw};
    }
    if ( defined $res{debug} ) {
        $res{debug} =~ s/\\n/\n/g;
        chomp $res{debug};
    }

    return %res;
}

sub gdb_strip_value {
    my $str = shift;
    $str =~ /value=\"(.+)\"$/;    #"
    return $1;
}

sub gdb_strip_quotes {
    my $str = shift;
    $str =~ /^\"(.*)\"$/;         #"
    return $1;
}

sub gdb_type_size {
    my ( $gdb, $type ) = @_;
    my %p = gdb_n_send( $gdb, "-data-evaluate-expression sizeof($type)" );
    return undef unless ( $p{status} eq "done" );
    return gdb_strip_value( $p{reason} );
}

sub gdb_type_offset {
    my ( $gdb, $type, $field ) = @_;
    my %p =
      gdb_n_send( $gdb, "-data-evaluate-expression \"&(($type *)0)->$field\"" );
    return undef unless ( $p{status} eq "done" );
    return hex( gdb_strip_value( $p{reason} ) );
}

sub gdb_func_addr {
    my ( $gdb, $func ) = @_;
    my %p = gdb_n_send( $gdb, "-data-evaluate-expression $func" );
    return undef unless ( $p{status} eq "done" );
    my $value = gdb_strip_value( $p{reason} );
    my @a     = split( " ", $value );
    my $hex   = $a[-2];
    return $hex;
}

sub gdb_var_addr {
    my ( $gdb, $var ) = @_;
    my %p = gdb_n_send( $gdb, "-data-evaluate-expression &$var" );
    return undef unless ( $p{status} eq "done" );
    $p{reason} =~ /value=\"(.+)\"$/;    #"
    return $1;
}

sub gdb_read_raw {
    my ( $gdb, $ptr, $size ) = @_;

    my @d;
    my $offset = 0;
    my $count  = 256;
    do {
        $count = $size if ( $size < $count );
        my %p =
          gdb_n_send( $gdb, "-data-read-memory -o $offset $ptr x 1 1 $count" );
        $offset += $count;

        return undef unless ( $p{status} eq "done" );
        my $val = gdb_parse_reason( $p{reason}, "thread-ids" );
        push( @d, @{ $val->{memory}[0]{data} } );

    } while ( $offset < $size );
    return @d[ 0 .. $size - 1 ];
}

sub gdb_string {
    my ( $gdb, $strp ) = @_;
    my $offset = 0;
    my $str    = "";
    my @s      = gdb_read_raw( $gdb, $strp, 128 );
    return undef if ( $s[0] eq undef );
    foreach my $d (@s) {
        my $v = hex($d);
        return $str if ( $v == 0 );
        $str .= sprintf( "%c", $v );
    }
    return $str;
}

sub handle_query {
    my ( $gdb, $vp, $query, $stats ) = @_;

    my @params = split( " ", $query );
    my $b      = shift @params;
    my $cmd    = shift @params;
    my $res;
    return "fail" unless defined $cmd;
    if ( $cmd eq "size" ) {
        $res = gdb_type_size( $gdb, $params[0] );
        $stats->{size}++;
    } elsif ( $cmd eq "offset" ) {
        $res = gdb_type_offset( $gdb, $params[0], $params[1] );
        $stats->{offset}++;
    } elsif ( $cmd eq "string" ) {
        my $str = gdb_string( $gdb, $params[1] );
        if ( defined $str ) {
            $stats->{string}++;
            $res = $str;
        }
    } elsif ( $cmd eq "func" ) {
        $res = gdb_func_addr( $gdb, $params[0] );
        $stats->{function}++;
    } elsif ( $cmd eq "sym" ) {
        $res = gdb_var_addr( $gdb, $params[0] );
        $stats->{symbol}++;
    } elsif ( $cmd eq "data" ) {
        my @r = gdb_read_raw( $gdb, $params[0], $params[1] );
        if ( $r[0] ne undef ) {
            $res = "@r";
            $stats->{datareads}++;
            $stats->{databytes} += $params[1];
        }
    } elsif ( $cmd eq "rank" ) {
        $res = $vp;
        $stats->{rank}++;
    } elsif ( $cmd eq "image" ) {
        my $image = readlink("/proc/$gdb->{tracepid}/exe");
        if ( defined $image ) {
            $res = $image;
        }
    } else {
        printf("Unhandled query $query\n");
    }
    if ( defined $res ) {
        return "ok $res";
    }
    $stats->{errors}++;

    return "fail";
}

sub launch_h {
    my ( $gdb, $vp ) = @_;

    my $h = {
        hpid     => -1,
        tracepid => -1,
        attached => 0,
        rdr      => "",
        wtr      => "",
        err      => "",
    };
    my @mq;

    my $cmd = $confInner{"minfo"};
    $h->{hpid} = open3( $h->{wtr}, $h->{rdr}, $h->{err}, $cmd )
      or die "Unable to popen() h: $!\n";

    my $handle = $h->{rdr};

    my $out = $h->{wtr};

    my %stats;

    while (<$handle>) {
        my $r = $_;
        chomp $r;
        if ( $r =~ /^req:/ ) {
            my $res = handle_query( $gdb, $vp, $r, \%stats );
            if ( defined $res ) {
                print $out "$res\n";
            }

            # Some things *do* fail here, symbol lookups
            # and we don't need to report it.
            if ( $res eq "fail" ) {
                debug( $vp, "Failed dll request $r\n" );
            }
        } else {
            push @mq, $r;
        }
    }

    waitpid( $h->{hpid}, 0 );
    close( $h->{rdr} );
    close( $h->{wtr} );
    close( $h->{err} );

    # Useful for tuning the dll itself...
    # print Dumper \%stats;

    return @mq;
}

sub fetch_mpi_queue {
    my ( $vp, $pid ) = @_;
    my $g = gdb_start();
    kill( "CONT", $pid );
    my $p = gdb_attach( $g, $pid );
    if ( !$p ) {
        debug( $vp, "Failed to attach to $pid\n" );
        return;
    }

    if ( $confInner{"mpi-dll"} ne "auto" ) {
        $ENV{MPINFO_DLL} = $confInner{"mpi-dll"};
    } else {
        my $base = gdb_var_addr( $g, "MPIR_dll_name" );
        if ( !defined $base ) {
            gdb_detach($g);
            gdb_quit($g);
            return;
        }
    }

    my @mq = launch_h( $g, $vp );
    gdb_detach($g);
    gdb_quit($g);
    return @mq;
}

# As above but take a gdb handle
sub fetch_mpi_queue_gdb {
    my ( $vp, $pid, $g ) = @_;

    if ( $confInner{"mpi-dll"} ne "auto" ) {
        $ENV{MPINFO_DLL} = $confInner{"mpi-dll"};
    } else {
        my $base = gdb_var_addr( $g, "MPIR_dll_name" );
        if ( !defined $base ) {
            return;
        }
    }

    my @mq = launch_h( $g, $vp );
    return @mq;
}

sub show_mpi_queue {
    my ( $vp, $pid ) = @_;

    my @mq = fetch_mpi_queue( $vp, $pid );
    foreach my $o (@mq) {
        output( $vp, $o );
    }
}

# Should do something clever here with handler_all so we get a single
# consistent sample from the individual nodes, the handler_all code
# doesn't do anything with output_dtype() yet however so give that
# a miss for the time being.
sub show_mpi_queue_for_deadlock {
    my ( $vp, $pid ) = @_;

    my @mq = fetch_mpi_queue( $vp, $pid );
    return \@mq;
}

# Ideally handle all this at a higher level...
sub show_mpi_queue_for_deadlock_all {
    my ($list) = @_;

    my @all;

    foreach my $proc ( @{$list} ) {
        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};

        debug $vp, "Attaching to $pid";
        my $gdb = gdb_start();
        kill( "CONT", $pid );
        if ( gdb_attach( $gdb, $pid ) ) {
            $proc->{gdb} = $gdb;
            push( @all, $proc );
        } else {
            output $vp, "Failed to attach to to process";
        }

    }

    foreach my $proc (@all) {
        my $tries = 0;

        my @threads;

        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};
        my $gdb = $proc->{gdb};

        my @mq = fetch_mpi_queue_gdb( $vp, $pid, $gdb );
        output_dtype( $vp, \@mq );
    }

    foreach my $proc (@all) {
        my $gdb = $proc->{gdb};
        gdb_detach($gdb);
        gdb_quit($gdb);
    }
}

sub go_deadlock_detect {
    my ($cd) = @_;

    # print Dumper $cd;
    my %ad;

    my @tg;

    if ( $#target_groups != -1 ) {
        foreach my $gid (@target_groups) {
            $tg[$gid]++;
        }
    }

    foreach my $process ( keys( %{$cd} ) ) {
        my $rd = $cd->{$process};
        foreach my $g ( keys( %{$rd} ) ) {
            my $gd  = $rd->{$g};
            my $gid = $gd->{id};

            if ( $#target_groups != -1 ) {
                next unless defined $tg[$gid];
            }

            if ( $gd->{size} > 1 ) {
                $ad{$gid}{map}[ $gd->{rank} ] = $process;
            }
            $ad{$gid}{size} = $gd->{size};
            $ad{$gid}{name} = $gd->{name};
            foreach my $coll ( keys( %{ $gd->{coll} } ) ) {
                my $count = $gd->{coll}{$coll}{count};
                if ( defined $gd->{coll}{$coll}{active} ) {
                    $ad{$gid}{active}{$coll}++;
                    $ad{$gid}{idents}{ $gd->{rank} }{'active'}{$coll} = $count;
                } else {
                    $ad{$gid}{idents}{ $gd->{rank} }{'inactive'}{$coll} =
                      $count;
                }
            }
        }
    }

    #print Dumper \%ad;
    my $ret     = "";
    my $i_count = 0;    # Interesting groups.
    foreach my $gid ( sort { $a <=> $b } keys %ad ) {

        if ( $#target_groups != -1 ) {
            next unless defined $tg[$gid];
        }

        my $gstr = "Information for group '$gid' ($ad{$gid}{name})\n";

        # Maybe show the group members, hope that the user doesn't turn
        # this on unless also setting target_groups!
        if ( $conf{"show-group-members"} ) {
            $gstr .= "group has $ad{$gid}{size} members\n";
            if ( defined $ad{$gid}{size} and $gid != 1 ) {
                for ( my $ident = 0 ; $ident < $ad{$gid}{size} ; $ident++ ) {
                    $gstr .=
                      "group member[$ident] => vp[$ad{$gid}{map}[$ident]]\n";
                }
            }
        }

        if ( $ad{$gid}{'active'} ) {
            $i_count++;

            # For all collective calls which we are interested in
            foreach my $s ( keys %{ $ad{$gid}{'active'} } ) {
                my %active;
                my %inactive;

                foreach my $ident ( keys %{ $ad{$gid}{'idents'} } ) {
                    if ( defined $ad{$gid}{'idents'}{$ident}{'active'}
                        and $ad{$gid}{'idents'}{$ident}{'active'}{$s} )
                    {
                        my $number = $ad{$gid}{'idents'}{$ident}{'active'}{$s};
                        push( @{ $active{$number} }, $ident );
                    } elsif ( $ad{$gid}{'idents'}{$ident}{'inactive'}{$s} ) {
                        my $number =
                          $ad{$gid}{'idents'}{$ident}{'inactive'}{$s};
                        push( @{ $inactive{$number} }, $ident );
                    }
                }
                foreach my $number ( sort ( keys %active ) ) {
                    $ret .= $gstr
                      . group_status_helper( "in call $number to $s",
                        0, $ad{$gid}{size}, @{ $active{$number} } );
                    $gstr = "";

                }
                foreach my $number ( sort ( keys %inactive ) ) {
                    $ret .= group_status_helper( "completed call $number to $s",
                        1, $ad{$gid}{size}, @{ $inactive{$number} } );
                }
            }
        } else {
            next unless ( $conf{"show-all-groups"} );
        }

        {
            my @inactive;
            foreach my $ident ( sort keys %{ $ad{$gid}{'idents'} } ) {

    #                if ( $ad{$gid}{'idents'}{$ident}{'statistics'}
    #                    and not defined $ad{$gid}{'idents'}{$ident}{'active'} )
                if ( not defined $ad{$gid}{'idents'}{$ident}{'active'} ) {
                    push( @inactive, $ident );
                }
            }
            if ( $#inactive != -1 ) {
                $ret .= $gstr
                  . group_status_helper( "not in a call to the collectives",
                    0, $ad{$gid}{size}, @inactive );
                $gstr = "";
            }
        }
    }

    my $count = keys(%ad);

    if ( $count == 1 ) {
        my $use_str = ( $i_count == 1 ) ? "" : " not";
        $ret .= "Total: $count group which is$use_str in use.\n";
    } else {
        my $i_str = ( $i_count == 1 ) ? "is" : "are";
        $ret .= "Total: $count groups of which $i_count $i_str in use.\n";
    }

    return "$ret";

    # print Dumper \%ad;
}

sub deadlock_detect {
    my ( $handle, $lines ) = @_;
    my $data;

    # XXX This is a bit of a hack to make the deadlock
    # code work with input files, the whole thing is due
    # a tidy-up on the full-duplex branch where this should
    # be solved properly.
    if ( defined $lines->{raw} ) {
        $data = $lines->{raw};
    } else {
        $data = $lines->{lines};
    }

    # print Dumper $data;
    my %coll_data;
    foreach my $rank ( keys( %{$data} ) ) {
        my $r = $data->{$rank};
        my %lid;
        foreach my $line ( @{$r} ) {
            if ( $line =~ /^comm(\d+): (\w+): \'(.*)\'$/ ) {
                $lid{$1}{$2} = $3;
            } elsif ( $line =~ /^comm(\d+): Rank: local (\d+) global (\d+)$/ ) {
                $lid{$1}{ranks}{$2} = $3;
            } elsif ( $line =~
/^comm(\d+): Collective \'(\w+)\': call count (\d+), ([not ]*)active$/
              )
            {
                $lid{$1}{coll}{$2}{count} = $3;
                if ( $4 eq "" ) {
                    $lid{$1}{coll}{$2}{active} = 1;
                }
            } elsif ( $line =~ /^msg\d+/ ) {
                ;    # nop
            } else {
                print("Failed to match minfo output: $line\n");
            }
        }
        $coll_data{$rank} = \%lid;
    }

    # print Dumper \%coll_data;

    my $r = go_deadlock_detect \%coll_data;
    print $r;
}

# For reference the other interesting options here are these two.
# "-stack-list-arguments 1"
# "-stack-list-locals 2"
sub gdb_dump_frames {
    my ( $gdb, $detail ) = @_;
    my %result = gdb_n_send( $gdb, "-stack-list-frames" );
    my $data = gdb_parse_reason( $result{reason}, "frame" );
    if ( not defined $data->{stack} ) {
        return ( { error => $data->{msg} || "unknown error" } );
    }
    if ( defined $detail ) {
        foreach my $frame ( @{ $data->{stack} } ) {
            my %r = gdb_n_send( $gdb,
                "-stack-list-arguments 0 $frame->{level} $frame->{level}" );
            my $args = gdb_parse_reason( $r{reason}, "name" );

            my @all;
            if ( defined $args->{"stack-args"}[0]{frame}{args} ) {
                my @names = @{ $args->{"stack-args"}[0]{frame}{args} };
                @{ $frame->{params} } = @names;
                push @all, (@names);
            }

            gdb_send( $gdb, "-stack-select-frame $frame->{level}" );
            my %s = gdb_n_send( $gdb, "-stack-list-locals 0" );
            if ( $s{status} eq "done" ) {
                my $args = gdb_parse_reason( $s{reason}, "name" );
                if ( defined $args->{locals} ) {
                    @{ $frame->{locals} } = @{ $args->{locals} };
                    push @all, ( @{ $args->{locals} } );
                }
            }

            foreach my $name (@all) {
                my %t = gdb_n_send( $gdb, "-data-evaluate-expression $name" );
                if ( $t{status} eq "done" ) {
                    my $v = gdb_parse_reason( $t{reason} );
                    $frame->{vals}{$name} = $v->{value};
                }
            }
        }
    }
    return @{ $data->{stack} };
}

sub gdb_dump_frames_per_thread {
    my ( $gdb, $detail ) = @_;
    my @th = ();
    my %result = gdb_n_send( $gdb, "-thread-list-ids" );
    if ( $result{status} ne "done" ) {
        return ("unknown error");
    }
    my $data = gdb_parse_reason( $result{reason}, "thread-ids" );
    if ( not defined $data->{"thread-ids"} ) {
        return ( { error => $data->{msg} || "unknown error" } );
    }
    if ( $data->{"number-of-threads"} == 0 ) {
        my %t;
        $t{id} = 0;
        @{ $t{frames} } = gdb_dump_frames( $gdb, $detail );
        push( @th, \%t );
        return @th;
    }
    foreach my $thread ( @{ $data->{"thread-ids"} } ) {
        my $id = $thread->{"thread-id"};
        my %t;
        $t{id} = $id;
        gdb_send( $gdb, "-thread-select $id" );
        @{ $t{frames} } = gdb_dump_frames( $gdb, $detail );
        push( @th, \%t );
    }
    return @th;
}

# I'm not sure what this is trying to do.
# sub gdb_try_args {
#    my ( $gdb, @frames ) = @_;
#    my %result = gdb_n_send( $gdb, "-stack-list-arguments 0" );
#
#    my $result = $result{reason};
#
#    my ($stack) = ( $result =~ /stack=\[(.+)\]/ );
#    for ( ( $stack =~ /frame=\{([^\}]+)\}/g ) ) {
#        my %d = ();
#        s/\"//g;    #"
#        map { $d{ $$_[0] } = $$_[1] } map { [ split /=/ ] } split(/,/);
#
#        push( @frames, \%d );
#    }
#
#    return @frames;
#}

sub gdb_next_result {
    my ($gdb) = shift;
    my $handle = $gdb->{rdr};

    while (<$handle>) {
        return "" if /^\(gdb\)/;
        return $_ if /^\^(done|error)/;
    }
    return "";
}

sub gdb_int_from_raw {
    my $str = shift;
    if ( $str =~ /\$\d+ \= (\d)+/ ) {
        return $1;
    }
}

sub stack_from_core {
    my $exe  = shift;
    my $core = shift;

    my $gdb = gdb_start( $exe, $core );

    my %e = gdb_n_next_result($gdb);

    my $r = $e{raw};

    $r =~ s/\\n/\n/g;
    $r =~ s/\\"/\"/g;    #"
    $r =~ s/\\\\/\\/g;

    my @r = split( "\n", $r );

    foreach my $l ( split( "\n", $r ) ) {
        next if ( $l =~ m/^done/ );
        next if ( $l =~ m/^Loaded/ );
        next if ( $l =~ m/^Reading/ );
        next if ( $l =~ m/^Using/ );
        print "$l\n";
    }

    # Send a invalid command so the wait_for_prompt in dump_frames... can work.
    # Should probably do this in gdb_start() and return the output somehow.
    my $handle = $gdb->{wtr};
    print $handle "\n";

    print "\n";

    my @threads;
    if ( $conf{"stack-shows-params"} ) {
        @threads = gdb_dump_frames_per_thread( $gdb, 1 );
    } else {
        @threads = gdb_dump_frames_per_thread($gdb);
    }

  # This code is (almost 100%) lifted from stack_trace_from_pids, could probably
  # factor it out into it's own helper function.
    foreach my $thread ( sort { $a->{id} <=> $b->{id} } @threads ) {
        my @frames = @{ $thread->{frames} };

        printf("ThreadId: $thread->{id}\n") if ( $#threads != 0 );

        for ( my $i = $#frames ; $i >= 0 ; $i-- ) {
            my $frame = $frames[$i];

            printf("ERROR: $$frame{error}\n")
              if exists $$frame{error};

            next unless exists $$frame{level};
            next unless exists $$frame{func};

            if ( $conf{"stack-shows-params"} ) {
                my @a;
                foreach my $arg ( @{ $frame->{params} } ) {
                    if ( defined $frame->{vals}{$arg} ) {
                        push( @a, "$arg = $frame->{vals}{$arg}" );
                    } else {
                        push( @a, "$arg = ??" );
                    }
                }
                my $a = join( ", ", @a );
                my $file = $frame->{file} || "?";
                my $line = $frame->{line} || "?";
                printf("$frame->{func}($a) at $file:$line\n");

                if ( $conf{"stack-shows-locals"} ) {
                    foreach my $arg ( @{ $frame->{locals} } ) {
                        if ( defined $frame->{vals}{$arg} ) {
                            printf("  $arg = $frame->{vals}{$arg}\n");
                        } else {
                            printf("  $arg = ??\n");
                        }
                    }
                }
            } else {
                printf( ( $$frame{func} || "?" ) 
                    . "() at "
                      . ( $$frame{file} || "?" ) . ":"
                      . ( $$frame{line} || "?" )
                      . "\n" );
            }
        }
    }

    gdb_quit($gdb);
}

sub run_ptrack_cmd {
    my ( $vp, $pid, $cmd ) = @_;

    debug $vp, "running (p) $cmd";
    my $lines = 0;

    kill( "CONT", $pid );
    open( CMD, "$cmd 2>/dev/null|" )
      || p_die( $vp, "cant start command $cmd" );
    while (<CMD>) {
        chomp $_;
        output $vp, $_;
        $lines++;
    }
    kill( "CONT", $pid );
    close CMD;
    return $lines;
}

sub run_command {
    my ( $vp, $cmd ) = @_;
    debug $vp, "running $cmd";
    open( CMDS, "$cmd|" ) || p_die $vp, "cant fork subcommand";
    while (<CMDS>) {
        chomp $_;
        output $vp, $_;
    }
    close CMDS;
    debug $vp, "Finished $cmd";
}

sub get_remote_env {
    my $pid = shift;

    my %env;

    local $/ = "\0";
    open( FD, "/proc/$pid/environ" ) or return undef;
    while (<FD>) {
        chomp;
        my @f   = split "=";
        my $key = $f[0];
        shift @f;
        $env{$key} = join( "=", @f );
    }
    close FD;
    return %env;
}

# Load the data about a given RMS job id,
# return a array of hashes
sub load_rms_procs {
    my $jobId = shift;

    if ( not open PIDFILE, "/proc/rms/programs/$jobId/pids" ) {

        # This is actually perfectly legitimate, it's because you
        # can do for example allocate -N4 prun -N2 <app>.  Because
        # of the way prun -T works (across a resource) not having
        # a pids file isn't always a bad thing.
        #
        # Of course it could mean that whatever jobs were supposed
        # to be running on this node aren't.
        debug undef, "Cannot open /proc/rms/programs/$jobId/pids";
        return;
    }

    my @procs;

    while (<PIDFILE>) {
        my ( $pid, $vp ) = split(' ');
        my %process;
        $process{pid} = $pid;
        if ( defined $vp and $vp != -1 ) {

            # Modern versions of RMS do the pid to vp translation for
            # us but report all unknown pids as -1.  Unknown in this
            # case means the rmsloaders and any processes which haven't
            # called elan_baseInit()
            $process{vp} = $vp;
        }
        push @procs, \%process;
    }
    close(PIDFILE);
    return @procs;
}

sub get_rms_jobid {
    my $slurmid = shift;
    my $rmsid   = 0;
    my $rmsdir  = "/proc/rms/programs";

    return undef unless ( -d $rmsdir );

    # This is a bit odd and isn't well tested (I don't have access to slurm)
    # We have been given the slurm ID on the command line and need to convert
    # this to a RMS id (as the kernel module sees it).
    # For each active RMS job on the node check if this pid translates to
    # a slurm pid for the job we have;

    my %gids = slurm_get_ids($slurmid);

    opendir( DIR, $rmsdir ) or die "Unable to open $rmsdir: $!\n";
    my @ids = readdir(DIR);
    closedir(DIR);

    for my $id (@ids) {
        next unless ( $id =~ /^\d+$/ );

        my @pids = load_rms_procs($id);
        next unless @pids;

        my $self    = $$;
        my $is_self = 0;
        foreach my $pid (@pids) {
            if ( $self == $pid->{pid} ) {
                $is_self = 1;
            }
        }
        next if ($is_self);
        my $pid = $pids[0]->{pid};

        # Best way, if this pid is in the target slurm job.
        return $id if defined $gids{$pid};

        my %env = get_remote_env($pid);

        # Discard this RMS job if it's the wrong job-step.
        next
          if ( defined $env{SLURM_STEPID}
            and $env{SLURM_STEPID} ne $confInner{"slurm-job-step"} );

        # The prefered although not perfect way...
        if ( $env{SLURM_JOBID} eq $slurmid ) {
            return $id;
        }

        # Seems to be a legacy option no longer used.
        # `scontrol pid2jobid $pid 2>&1` =~ m/id (\d+) /;
        # return $id if ( $1 == $slurmid );
    }

    return undef;
}

sub show_task_file {
    my ( $vp, $file, $prefix ) = @_;
    if ( defined $prefix ) {
        $prefix = "$prefix: ";
    } else {
        $prefix = "";
    }
    return unless ( -f $file );
    open( FD, "$file" ) or return;
    my @all = <FD>;
    close FD;
    foreach my $l (@all) {
        chomp $l;
        output( $vp, "$prefix$l" );
    }
}

sub show_task_stat_file {
    my ( $vp, $file, $prefix ) = @_;
    if ( defined $prefix ) {
        $prefix = "$prefix";
    } else {
        $prefix = "";
    }
    my @stat_names =
      qw(pid comm state ppid pgrp session tty_nr tpgid flags minflt
      cminflt majflt cmajflt utime stime cutime cstime priority nice
      num_threads itrealvalue starttime vsize rss rlim startcode endcode
      startstack kstkesp kstkeip signal blocked sigignore sigcatch wchan
      nswap cnswap exit_signal processor rt_ptiority policy
      delayacct_blkio_ticks guest_time cguest_time);
    return unless ( -f $file );
    open( FD, "$file" ) or return;
    my @all = <FD>;
    close FD;

    foreach my $l (@all) {
        chomp $l;
        my @stats = split( / /, $l );
        for ( my $i = 0 ; $i <= $#stats ; $i++ ) {
            output( $vp, "$prefix.$stat_names[$i]: @stats[$i]" );
        }

    }
}

sub show_task_dir {
    my ( $vp, $pid, $dir ) = @_;

    if ( $confInner{"proc-shows-proc"} ) {
        my $exe = readlink "$dir/exe";
        if ( defined $exe ) {
            output $vp, "exe:$exe";
        }

        # pcpu is calculated from /proc elsewhere.
        # This isn't either, ps reports time
        # as a percentage since the program started so
        # isn't live as the top-reported figure is.

        #my $pcpu = `ps --pid $pid -o pcpu= 2>/dev/null`;
        #chomp($pcpu);
        #if ( $pcpu != "" ) {
        #    output( $vp, "pcpu:$pcpu%" );
        #}

        show_task_file( $vp, "$dir/status" );
        show_task_file( $vp, "$dir/wchan", "wchan" );
        show_task_file( $vp, "$dir/stat", "stat" );
        if (   $confInner{"proc-shows-stat"}
            or $confInner{mode} eq "proc-summary" )
        {
            show_task_stat_file( $vp, "$dir/stat", "stat" );
        }

        if ( -f "$dir/maps" ) {
            open( MAP, "$dir/maps" );
            my @map = (<MAP>);
            close(MAP);
            my %totals;
            foreach my $rgn (@map) {
                my ( $area, $perm, $offset, $time, $inode, $file ) =
                  split( " ", $rgn );
                if ( $file =~ '/dev/elan4/sdram(\d+)' ) {
                    my $rail = $1;
                    my ( $start, $end ) = split( "-", $area );
                    my $s     = _hex("0x$start");
                    my $e     = _hex("0x$end");
                    my $delta = $e - $s;
                    if ( defined $totals{$rail} ) {
                        $totals{$rail} += $delta;
                    } else {
                        $totals{$rail} = $delta;
                    }
                }
            }
            foreach my $rail ( sort keys %totals ) {
                my $total = $totals{$rail} / 1024;
                output( $vp, "sdram$rail: $total kb" );
            }
        }
    }
    if ( $confInner{"proc-shows-fds"} ) {
        opendir( FDS, "$dir/fd" );
        my @fds = readdir(FDS);
        closedir(FDS);
        my @all_fddata;
        foreach my $fd (@fds) {
            next if ( $fd eq "." );
            next if ( $fd eq ".." );
            my $target = readlink "$dir/fd/$fd";

            my %fdhash;
            $fdhash{target} = $target;
            $fdhash{fd}     = $fd;

            # New fdinfo data, it's verbose so only enable it
            # if requested by -O proc-shows-fds=full
            if ( $confInner{"proc-shows-fds"} eq "full" ) {
                if ( -f "$dir/fdinfo/$fd" ) {
                    open( FDI, "$dir/fdinfo/$fd" );
                    my @fdi = (<FDI>);
                    close FDI;
                    foreach my $fdi (@fdi) {
                        chomp($fdi);
                        my ( $key, $value ) = split( ":", $fdi );
                        $value =~ s/\t//g;
                        $fdhash{$key} = $value;
                    }
                }
            }
            push( @all_fddata, \%fdhash );
        }
        foreach my $fd (@all_fddata) {
            if ( defined $fd->{pos} ) {
                output( $vp,
                    "fd$fd->{fd}: $fd->{target} \($fd->{pos} $fd->{flags}\)" );
            } else {
                output( $vp, "fd$fd->{fd}: $fd->{target}" );
            }
        }
    }
    if ( $confInner{"proc-shows-maps"} ) {
        show_task_file( $vp, "$dir/maps", "maps" );
    }
}

# Convert the first line of /proc/stat to elapsed jiffies.
sub string_to_jiffies {
    my ($ps) = @_;

    my @usecc = split( " ", $ps );

    my $jiffies = 0;

    # Remove the "cpu" prefix.
    shift(@usecc);
    foreach my $usecv (@usecc) {
        $jiffies += $usecv;
    }
    return $jiffies;
}

sub add_and_divide_jiffies {
    my ( $pre, $post ) = @_;

    my $jiffies;

    my @pre = split( " ", $pre );

    return ( ( string_to_jiffies($pre) + string_to_jiffies($post) ) / 2 );
}

# Convert /proc/self/stat into used jiffies.
sub stat_to_jiffies {
    my $stat    = shift;
    my @values  = split( " ", $stat );
    my $jiffies = 0;
    $jiffies += $values[13];    # utime
    $jiffies += $values[14];    # stime
    return $jiffies;
}

sub show_proc_all {
    my ($list) = @_;

    my @all;

    foreach my $proc ( @{$list} ) {
        my $pid = $proc->{pid};
        open( $proc->{handle}, "/proc/$pid/stat" );
    }

    open( SFD, "/proc/stat\n" );

    # Begin critical path.
    my $stat = <SFD>;

    foreach my $proc ( @{$list} ) {
        my $pid = $proc->{pid};
        my $h   = $proc->{handle};
        $proc->{stat_start} = <$h>;
        seek( $proc->{handle}, 0, 0 );
    }

    seek( SFD, 0, 0 );
    my $stat2 = <SFD>;

    # End critical path.

    my $jiffies_start = add_and_divide_jiffies( $stat, $stat2 );

    foreach my $proc ( @{$list} ) {
        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};
        show_proc( $vp, $pid );
    }

    sleep(1);

    seek( SFD, 0, 0 );

    # Begin critical path.
    $stat = <SFD>;

    foreach my $proc ( @{$list} ) {
        my $pid = $proc->{pid};
        my $h   = $proc->{handle};
        $proc->{stat_end} = <$h>;
        close( $proc->{handle} );
    }

    seek( SFD, 0, 0 );
    $stat2 = <SFD>;

    # End critical path.

    my $cpucount = 0;
    while (<SFD>) {
        if ( $_ =~ /^cpu\d/ ) {
            $cpucount++;
        }
    }
    close(SFD);

    my $jiffies_end = add_and_divide_jiffies( $stat, $stat2 );

    my $elapsed = $jiffies_end - $jiffies_start;

    foreach my $proc ( @{$list} ) {
        my $vp       = $proc->{vp};
        my $jpre     = stat_to_jiffies( $proc->{stat_start} );
        my $jpost    = stat_to_jiffies( $proc->{stat_end} );
        my $jused    = $jpost - $jpre;
        my $used     = ( $jused / $elapsed ) * $cpucount * 100;
        my $used_str = sprintf( "%d", $used );

        output( $vp, "pcpu: $used_str" );
    }
}

sub show_proc {
    my ( $vp, $pid ) = @_;

    if ( $confInner{"proc-shows-proc"} ) {
        output( $vp, "hostname:$confInner{hostname}" );
    }

    if ( -d "/proc/$pid/task" and $confInner{"proc-shows-proc"} ) {

        # 2.6 kernel. (ntpl)
        opendir( DIR, "/proc/$pid/task" );
        my @tasks = readdir(DIR);
        closedir(DIR);
        foreach my $task (@tasks) {
            next if ( $task eq "." );
            next if ( $task eq ".." );
            show_task_dir( $vp, $pid, "/proc/$pid/task/$task" );
        }
    } else {
        show_task_dir( $vp, $pid, "/proc/$pid" );
    }
}

sub gdb_int_from_pid {
    my $pid = shift;
    my $var = shift;
    my $gdb = gdb_start();
    if ( not gdb_attach( $gdb, $pid ) ) {
        return;
    }

    # use data-evaluate-expression here?
    my %r = gdb_n_send( $gdb, "p $var" );
    my $nvp = gdb_int_from_raw( $r{raw} );
    gdb_detach($gdb);
    gdb_quit($gdb);
    return $nvp;
}

# Try and be clever here, attach to each and every process on this node first,
# then go back and query them each in turn, should mean that some processes are
# not spinning whilst gdb is doing it's thing which will mean a quicker runtime
# but also that the resulting stack traces will have less artifacts because running
# processes bunch up behind the non-running ones.
sub stack_trace_from_pids {
    my ($list) = @_;

    my @all;

    foreach my $proc ( @{$list} ) {
        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};

        debug $vp, "Attaching to $pid";
        my $gdb = gdb_start();
        kill( "CONT", $pid );
        if ( gdb_attach( $gdb, $pid ) ) {
            $proc->{gdb} = $gdb;
            push( @all, $proc );
        } else {
            output $vp, "Failed to attach to process";
        }

    }

    foreach my $proc (@all) {
        my $tries = 0;

        my @threads;

        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};

        my $ok;
        do {
            debug $vp, "try $tries";
            my $gdb;

            if ($tries) {
                debug $vp, "Re-attaching to $pid, $tries";
                kill( "CONT", $pid );
                sleep(1);
                my $g = gdb_start();
                if ( gdb_attach( $g, $pid ) ) {
                    $gdb = $g;
                } else {
                    output $vp, "Failed to attach to process";
                }
            } else {
                $gdb = $proc->{gdb};
            }

            $ok = 0;
            if ( defined $gdb ) {
                if ( $confInner{"stack-shows-params"} ) {
                    @threads = gdb_dump_frames_per_thread( $gdb, 1 );
                } else {
                    @threads = gdb_dump_frames_per_thread($gdb);
                }
                gdb_detach($gdb);
                gdb_quit($gdb);
                kill( "CONT", $pid );
                if ( defined $threads[0]->{frames} ) {
                    my @frames = @{ $threads[0]->{frames} };

                    $ok = 1;
                    $ok = 0
                      unless ( defined $frames[$#frames]{func}
                        and $frames[$#frames]{func} eq "main" );
                } else {
                    $ok = 0;
                }
            }
            $tries++;
          } while ( ( $ok != 1 )
            and ( $tries < $confInner{"gdb-retry-count"} ) );

        if ( not defined $threads[0]{id} ) {
            output( $vp, "Could not extract stack trace from application" );
            return;
        }

        if ( defined $threads[0]{error} ) {
            output( $vp, $threads[0]{error} );
            return;
        }

        foreach my $thread ( sort { $a->{id} <=> $b->{id} } @threads ) {
            next unless defined $thread->{frames};
            my @frames = @{ $thread->{frames} };

            output( $vp, "ThreadId: $thread->{id}" ) if ( $#threads != 0 );

            for ( my $i = $#frames ; $i >= 0 ; $i-- ) {
                my $frame = $frames[$i];

                output( $vp, "ERROR: $$frame{error}" )
                  if exists $$frame{error};

                next unless exists $$frame{level};
                next unless exists $$frame{func};

                if ( $confInner{"stack-shows-params"} ) {
                    my @a;
                    foreach my $arg ( @{ $frame->{params} } ) {
                        if ( defined $frame->{vals}{$arg} ) {
                            push( @a, "$arg = $frame->{vals}{$arg}" );
                        } else {
                            push( @a, "$arg = ??" );
                        }
                    }
                    my $a = join( ", ", @a );
                    my $file = $frame->{file} || "?";
                    my $line = $frame->{line} || "?";
                    output( $vp, "$frame->{func}($a) at $file:$line" );

                    if ( $confInner{"stack-shows-locals"} ) {
                        foreach my $arg ( @{ $frame->{locals} } ) {
                            if ( defined $frame->{vals}{$arg} ) {
                                output( $vp, "  $arg = $frame->{vals}{$arg}" );
                            } else {
                                output( $vp, "  $arg = ??" );
                            }
                        }
                    }
                } else {
                    output( $vp,
                            ( $$frame{func} || "?" ) 
                          . "() at "
                          . ( $$frame{file} || "?" ) . ":"
                          . ( $$frame{line} || "?" ) );
                }
            }
        }
    }
}

sub kill_proc {
    my ( $vp, $pid ) = @_;
    my $signal = uc( $confInner{args}{signal} );
    kill( $signal, $pid );
}

sub show_queue {
    my ( $vp, $pid ) = @_;

    # Nobble the LD_LIBRARY_PATH to give etrace the best chance of working.
    my %remote_env = get_remote_env($pid);

    if ( defined $remote_env{LD_LIBRARY_PATH} ) {
        $ENV{"LD_LIBRARY_PATH"} =
          "$remote_env{LD_LIBRARY_PATH}:$confInner{myld}";
    }

    my $lines = run_ptrack_cmd( $vp, $pid,
        "$confInner{edb} --queues --pid=$pid $confInner{edbopt}" );

    return if ( $lines != 0 );

    show_mpi_queue( $vp, $pid );
}

sub show_clever_full_stack {
    my ( $vp, $pid ) = @_;

    my $gdb = gdb_start();
    kill( "CONT", $pid );
    if ( gdb_attach( $gdb, $pid ) ) {
        my @threads = gdb_dump_frames_per_thread( $gdb, 1 );
        gdb_detach($gdb);
        gdb_quit($gdb);
        kill( "CONT", $pid );

        foreach my $thread ( sort { $a->{id} <=> $b->{id} } @threads ) {
            my @frames = @{ $thread->{frames} };
            for ( my $i = $#frames ; $i >= 0 ; $i-- ) {
                my $frame = $frames[$i];

                my @a;
                foreach my $arg ( @{ $frame->{params} } ) {
                    if ( defined $frame->{vals}{$arg} ) {
                        push( @a, "$arg = $frame->{vals}{$arg}" );
                    } else {
                        push( @a, "$arg = ??" );
                    }
                }
                my $a = join( ", ", @a );
                my $file = $frame->{file} || "?";
                my $line = $frame->{line} || "?";
                output( $vp, "$frame->{func}($a) at $file:$line" );

                my $show_locals = 0;
                if ($show_locals) {
                    foreach my $arg ( @{ $frame->{locals} } ) {
                        if ( defined $frame->{vals}{$arg} ) {
                            output( $vp, "  $arg = $frame->{vals}{$arg}" );
                        } else {
                            output( $vp, "  $arg = ??" );
                        }
                    }
                }
            }
        }
    }
}

sub show_full_stack {
    my ( $vp, $pid, $file ) = @_;
    run_ptrack_cmd( $vp, $pid, "gdb -batch -x $file -p $pid" );
}

sub show_full_stacks {
    my ($list) = @_;

    if (0) {

        # -x does this, just do what we used to.
        foreach my $proc ( @{$list} ) {
            show_clever_full_stack( $proc->{vp}, $proc->{pid} );
        }
        return;
    }

    my ( $fh, $file ) = tempfile("/tmp/padb.XXXXXXXX");
    print $fh "where full\n";
    print $fh "detach\n";
    close $fh;

    foreach my $proc ( @{$list} ) {
        show_full_stack( $proc->{vp}, $proc->{pid}, $file );
    }

    unlink($file);
}

sub set_debug {
    my ( $vp, $pid ) = @_;
    run_command( $vp,
"edb --key $confInner{key} --debug=$confInner{args}{dflag} --target-vp=$vp"
    );
}

my $mpi_watch_data = <<EOF;
Barrier,b,elan_gsync,elan_hgsync,PMPI_Barrier,MPI_Barrier,shmem_barrier
Broadcast,B,elan_hbcast,elan_bcast,PMPI_Bcast,MPI_Bcast,shmem_bcast
AllGather,G,PMPI_Allgather,MPI_ALLgather
Gather,g,elan_gather,PMPI_Gather,MPI_Gather
AllReduce,R,PMPI_Allreduce,MPI_Allreduce
Reduce,r,elan_reduce,PMPI_Reduce,MPI_Reduce
alltoall,a,elan_alltoall,PMPI_Alltoall,MPI_Alltoall
alltoalls,A,elan_alltoalls
wait,w,elan_wait
EOF

# Load a file for use in MPI_Watch.
sub mpi_watch_load {
    my $file = shift;

    # File is a csv file,
    # Name,c,function1,function2

    if ( defined $confInner{"mpi-watch-file"} ) {
        my %fns;
        my $f = $confInner{"mpi-watch-file"};
        open( MW, $f ) or return;
        my @d = (<MW>);
        close(MW);

        foreach my $mode (@d) {
            chomp $mode;
            my ( $name, $char, @fns ) = split( ",", $mode );
            $fns{names}{$name} = $char;
            foreach my $fn (@fns) {
                $fns{fns}{$fn} = $name;
            }
        }
        return \%fns;
    }

    my %fns;
    foreach my $mode ( split( "\n", $mpi_watch_data ) ) {
        chomp $mode;
        my ( $name, $char, @fns ) = split( ",", $mode );
        $fns{names}{$name} = $char;
        foreach my $fn (@fns) {
            $fns{fns}{$fn} = $name;
        }
    }
    return \%fns;
}

# Legend:
#
# u - unexpected messages
# U - unexpected and other messages
# s - send messages only
# r - receive messages only
# m - send and receive messages
# . - no messages, consuming CPU
# - - sleeping
#
# * - error.

sub mpi_watch {
    my ( $vp, $pid ) = @_;

    my @mq   = fetch_mpi_queue( $vp, $pid );
    my $sm   = 0;
    my $rm   = 0;
    my $um   = 0;
    my $good = ".";

    my %res;

    my $fns = mpi_watch_load();

    my $fnmode;
    my $fnreal;
    my $gdb = gdb_start();
    kill( "CONT", $pid );
    if ( gdb_attach( $gdb, $pid ) ) {
        my @threads = gdb_dump_frames_per_thread($gdb);
        gdb_detach($gdb);
        gdb_quit($gdb);
        kill( "CONT", $pid );

        foreach my $thread ( sort { $a->{id} <=> $b->{id} } @threads ) {
            my @frames = @{ $thread->{frames} };
            for ( my $i = $#frames ; $i >= 0 ; $i-- ) {
                my $frame = $frames[$i];
                if ( defined $fns->{fns}{ $frame->{func} } ) {
                    $fnmode = $fns->{fns}{ $frame->{func} };
                    $fnreal = $frame->{func};
                    last;
                }
            }
        }
    }

    # $res{mq} = \@mq;
    if ( $#mq == 0 ) {
        $good = ",";
    } else {
        foreach my $o (@mq) {
            if ( $o =~ /Operation (\d)/ ) {
                my $type = $1;
                $sm++ if ( $type == 0 );
                $rm++ if ( $type == 1 );
                $um++ if ( $type == 2 );
            }
        }
    }

    my $mt = ( grep { $_ } ( $sm, $rm, $um ) );
    if ( $mt != 0 ) {
        my $mode = "*";

        if ($um) {
            $mode = "u";
            $mode = "U" if ( $mt != 1 );
        } else {
            if ( $mt == 1 ) {
                $mode = "s" if ($sm);
                $mode = "r" if ($rm);
            } else {
                $mode = "m";
            }
        }
        $res{state} = $mode;
        output( $vp, $mode );
        return \%res;
    }

    if ( defined $fnmode ) {
        $res{state} = "$fns->{names}{$fnmode} $fnreal  ";
        $res{state} = $fns->{names}{$fnmode};
        return \%res;
    }

    my $m = find_from_status( $pid, "State" );
    if ( $m eq "R" ) {
        $m = $good;
    } elsif ( $m eq "S" ) {
        $m = "-";
    } else {
        $m = "*";
    }
    output $vp, $m;
    $res{state} = $m;
    return \%res;
}

sub show_pid {
    my ( $vp, $pid ) = @_;

    debug( $vp, "Looking at $vp, pid: $pid" );

    if ( defined $allfns{ $confInner{mode} }{handler} ) {
        my $res = $allfns{ $confInner{mode} }{handler}( $vp, $pid );
        if ( defined $allfns{ $confInner{mode} }{out_handler} ) {
            output_dtype( $vp, $res );
        }
    } else {
        my %d;
        $d{pid} = $pid;
        $d{vp}  = $vp;
        push( @{ $confInner{"all-pids"} }, \%d );
    }
}

sub maybe_show_pid {
    my ( $vp, $pid ) = @_;

    debug( $vp, "maybe_show_pid vp $vp, pid: $pid" );

    if ( $#ranks == -1 ) {
        show_pid( $vp, $pid );
    }

    foreach my $rank (@ranks) {
        if ( $rank eq $vp ) {
            show_pid( $vp, $pid );
            return;
        }
    }
}

my %proc_data;

sub load_all_proc_info {
    my $cmd = "ps -eo pid,ppid,user,comm";
    open( PS, "$cmd|" );
    my @pids = (<PS>);
    close(PS);
    foreach my $pid (@pids) {
        my ( $pid, $ppid, $user, $cmd ) = split( " ", $pid );
        next if $pid eq "PID";
        $proc_data{$pid}{PPid} = $ppid;

        # $proc{$pid}{user} = $user;
        $proc_data{$pid}{Name} = $cmd;
    }
}

sub find_from_status {
    my $pid = shift;
    my $key = shift;

    if ( -f "/proc/version" ) {
        open( PCMD, "/proc/$pid/status" ) or return;
        while (<PCMD>) {
            my $l = $_;
            if ( $l =~ /$key:\t+(\w+)/ ) {
                close PCMD;
                return $1;
            }
        }
        close PCMD;
    } else {
        load_all_proc_info() if ( keys(%proc_data) eq 0 );
        return $proc_data{$pid}{$key} if ( defined $proc_data{$pid}{$key} );
    }
    return;
}

sub is_resmgr_process {
    my $pid  = shift;
    my $name = find_from_status( $pid, "Name" );
    my $mgrs = { 'rmsloader' => 1, 'slurmd' => 1, 'slurmstepd' => 1 };
    return 1 if ( defined $mgrs->{$name} );
}

sub is_pid_script {
    my $pid = shift;
    my $exe = readlink("/proc/$pid/exe");
    my $cmd;
    if ( defined $exe ) {
        $cmd = basename($exe);
    } else {
        $cmd = find_from_status( $pid, "Name" );
    }
    my %scripts;
    map { $scripts{$_}++ } split( ",", $confInner{"scripts"} );
    return 1 if ( defined $scripts{$cmd} );
    return 0;
}

sub is_desc_of_resmgr {
    my $resmgrs = shift;
    my $pid     = shift;

    my $ppid = find_from_status( $pid, "PPid" );

    while ( defined $ppid and $ppid != 1 ) {
        return 1 if ( defined $resmgrs->{$ppid} );
        $ppid = find_from_status( $ppid, "PPid" );
    }

    return 0;
}

sub vp_from_pid {
    my $gids    = shift;
    my $resmgrs = shift;
    my $pid     = shift;

    return $gids->{$pid} if ( defined $gids->{$pid} );

    my $ppid = find_from_status( $pid, "PPid" );

    while ( defined $ppid and $ppid != 1 ) {
        return $gids->{$ppid} if ( defined $gids->{$ppid} );
        return undef if ( defined $resmgrs->{$ppid} );
        $ppid = find_from_status( $ppid, "PPid" );
    }
    return undef;
}

sub slurm_get_ids {
    my $jobid = shift;

    my %gids;

    my @procs =
      `scontrol listpids $jobid.$confInner{"slurm-job-step"} 2>/dev/null`;
    return undef if ( $? != 0 );
    foreach my $proc (@procs) {
        my ( $pid, $job, $step, $local, $global ) = split( " ", $proc );
        next if ( $global eq "-" );
        next unless ( $job == $jobid );
        next unless ( $step == $confInner{"slurm-job-step"} );
        $gids{$pid} = $global;
    }
    return %gids;
}

# Do the right thing with slurm...
sub slurm_find_pids {
    my $jobid = shift;

    # Slurm has the concept of a "job" and a "job step" which are
    # roughly analogous to "resource" and "job" in RMS terms,
    # the difference being that steps within a job are counted
    # from 0 in slurm whereas there is a global job namespace in
    # RMS.
    # Therefore padb *has* to target slurm jobs as they have the only
    # globally unique identifier.  You can use
    # -Oslurm-job-step=<step> to target individual job steps within
    # a job however.

    # Modern slurm systems have a scontol listpids option which we use however
    # older systems require a little more legwork and aren't precise.

    # These are the key variables...
    # SLURM_JOBID=1234
    # SLURM_STEPID=0
    # RMS_RESOURCE=1234  (Not needed)
    # RMS_JOBID=5678

    # SLURM_JOBID
    # RMS_JOBID

    my %gids = slurm_get_ids($jobid);

    opendir( DIR, "/proc/" );
    my @pids = readdir(DIR);
    closedir(DIR);

    my %resmgr;    # All processes which are resource managers.

    foreach my $pid (@pids) {
        next unless ( $pid =~ /^\d+$/ );
        if ( is_resmgr_process($pid) ) {
            $resmgr{$pid} = find_from_status( $pid, "Name" );
        }
    }

    my %pjobs;     # All parallel jobs (children of resource managers).;

    foreach my $pid (@pids) {
        next unless ( $pid =~ /^\d+$/ );

        # Skip over this process unless it's spawned from a resource manager.
        next unless is_desc_of_resmgr( \%resmgr, $pid );

        my $script = is_pid_script($pid);

        my $vp;

        if (%gids) {
            $vp = vp_from_pid( \%gids, \%resmgr, $pid );
            debug $vp, "Found $vp from $pid using scontrol listpids";
        }

        if ( not defined $vp ) {
            my %env = get_remote_env($pid);

            debug undef,
"Checking slurm pid: $pid, job $env{SLURM_JOBID}, step $env{SLURM_STEPID}, proc $env{SLURM_PROCID}, script $script";
            debug undef,
"Checking  rms  pid: $pid, job $env{RMS_JOBID}, proc $env{RMS_PROCID}, script $script";

            if ( $env{SLURM_JOBID} eq $jobid ) {
                if ( $env{SLURM_STEPID} eq $confInner{"slurm-job-step"} ) {
                    $vp = $env{SLURM_PROCID};
                }
            } elsif ( $env{RMS_JOBID} eq $jobid ) {
                $vp = $env{RMS_PROCID};
            }
        }

        next unless ( defined $vp );

        # Ignore bash/sh/perl wrappers.
        next if $script;

        push( @{ $pjobs{$vp} }, $pid );

    }

    foreach my $vp ( keys(%pjobs) ) {

        # If there are multiple possible processes then target each of them,
        # this is possibly wrong and suggestions for handling this better are
        # welcome.
        foreach my $pid ( @{ $pjobs{$vp} } ) {
            maybe_show_pid( $vp, $pid );
        }
    }
}

# Local processes per node, i.e. no resource manager support.
sub local_find_pids {
    my $pid = shift;

# Hard-wire this to vp 0, probably not true but without the resource manager it's difficult
# to tell.  We should really use elan_base->state->vp here.
    my $vp = gdb_int_from_pid( $pid, "elan_base->state->vp" );
    if ( not defined $vp or $vp eq "" ) {
        $vp = 0;
    }

    maybe_show_pid( $vp, $pid );
}

sub mpd_find_pids {
    my $job = shift;
    my $d   = mpd_get_data();

    my $j = $d->{$job}{pids}{ $confInner{"hostname"} };

    foreach my $pid ( keys %{$j} ) {
        maybe_show_pid( $j->{$pid}, $pid );
    }
}

sub open_find_pids {
    my $job = shift;
    open_get_data( $confInner{"open-ps"} );
    my $hostname = hostname();

    foreach my $rank ( keys( %{ $open_jobs{$job}{ranks}{$hostname} } ) ) {
        maybe_show_pid( $rank, $open_jobs{$job}{ranks}{$hostname}{$rank} );
    }
}

sub rms_find_pids {
    my $jobid = shift;

    my %vps;

    my @procs = load_rms_procs($jobid);

    foreach my $proc (@procs) {

        my $vp = $proc->{vp};

        # With any luck we have a new RMS and vp is extracted from /proc
        # Otherwise try and pick it out of the environment in a sane way
        # if that fails report errors for any process with isn't rmsloader

        if ( defined $vp ) {
            debug $vp, "Maybe looking at vp: $vp, pid: $proc->{pid}";
        } else {
            debug undef, "Maybe looking at pid: $proc->{pid}";
        }

        # Strip or rmsloader and slurm[step]d;
        next if ( is_resmgr_process( $proc->{pid} ) );

        # If we aren't known to be the vp and we are not a direct descendant
        # of the resource manager then skip over to the next process.
        next if ( not defined $vp and not is_parent_resmgr( $proc->{pid} ) );

        my $found = "actual";

        if ( not defined $vp ) {
            $found = "likely";

            my %env = get_remote_env( $proc->{pid} );

            if ( defined $env{RMS_PROCID} ) {
                $vp = $env{RMS_PROCID};
            } elsif ( defined $env{SLURM_PROCID} ) {
                $vp = $env{SLURM_PROCID};
            } else {

                debug( undef,
                    "Could not extract vp for process, $proc->{pid} "
                      . ( readlink "/proc/$proc->{pid}/exe" ) );
                next;
            }
        }

        debug( $vp, "Found $found vp $vp, pid: $proc->{pid}" );

        push( @{ $vps{$vp}{$found} }, $proc->{pid} );
    }

    foreach my $vp ( keys %vps ) {
        if ( defined $vps{$vp}{actual} ) {
            foreach my $pid ( @{ $vps{$vp}{actual} } ) {
                maybe_show_pid( $vp, $pid );
            }
        } else {
            foreach my $pid ( @{ $vps{$vp}{likely} } ) {
                maybe_show_pid( $vp, $pid );
            }
        }
    }

}

sub inner_show_stats {
    my $jobid = shift;
    my $key   = ( $jobid << 9 ) - 1;
    run_command( undef,
        "$confInner{edb} --stats-raw --parallel --key=$key $confInner{edbopt}"
    );
}

sub inner_main {

    $confInner{"slurm-job-step"} = "0";
    $confInner{"verbose"}        = 0;
    $confInner{"edbopt"}         = "";
    $confInner{"rmgr"}           = "auto";
    $confInner{"edb"}            = find_edb();
    $confInner{"minfo"}          = find_minfo();
    $confInner{"open-ps"}        = "";

    # The different options this script can perform.  One (and only one) of
    # these must be set.
    my $stats;

    # Local vars to help with command line parsing
    my @config_options;
    my $line_formatted;
    my $jobid;

    my %optionhash = (
        "config-option|O=s" => \@config_options,
        "jobid=i"           => \$jobid,
        "line-formatted"    => \$line_formatted,
        "rank=i"            => \@ranks,
        "stats-full"        => \$stats,
        "verbose|v+"        => \$confInner{"verbose"}
    );

    my %config_hash;

    foreach my $arg ( keys %allfns ) {
        $optionhash{ $allfns{$arg}{arg} } = \$config_hash{$arg};
        foreach my $sec ( @{ $allfns{$arg}{secondary} } ) {
            $sec->{value} = $sec->{default};
            $optionhash{ $sec->{arg} } = \$sec->{value};
        }

        if ( defined $allfns{$arg}{options_i} ) {
            foreach my $o ( keys( %{ $allfns{$arg}{options_i} } ) ) {
                $confInner{$o} = $allfns{$arg}{options_i}{$o};
            }
        }
    }

    Getopt::Long::Configure("bundling");

    GetOptions(%optionhash) or die("could not parse options\n");

    my $mode;

    foreach my $arg ( keys %config_hash ) {
        next unless defined $config_hash{$arg};
        $mode = $arg;
    }

    $confInner{"mode"} = $mode;

    # Put the args in a hash so that they can be referenced by name.
    if ( defined $allfns{$mode}{secondary} ) {
        foreach my $sec ( @{ $allfns{$mode}{secondary} } ) {
            $confInner{"args"}{ $sec->{arg_long} } = $sec->{value};
        }
    }

    # Load all config options from the command line, unlike the outer
    # code we don't check them to be valid here, any set on the outer
    # command line are automatically passed on and they might not mean
    # anything to us so silently ignore them.
    foreach my $config_option (@config_options) {
        my @pairs = split( ",", $config_option );
        foreach my $pair (@pairs) {
            my ( $name, $val ) = split( "=", $pair );
            if ( not defined $confInner{$name} ) {
                debug undef, "Unknown option $name";
            }
            $confInner{$name} = $val;
        }
    }

    # Load some non user-modifiable data into conf now
    $confInner{"lineformatted"} = $line_formatted;
    $confInner{"hostname"}      = hostname();

    $confInner{"myld"} = $ENV{"LD_LIBRARY_PATH"};

    # $rjobid is used for accessing the stats on slurm
    # systems, on rms it's just the jobId but on combined
    # slurm/rms systems it's modifed to be the rms id
    # and the jobid is left as the slurm job id.
    my $rjobid = $jobid;
    if ( exists $ENV{"SLURM_PROCID"} ) {
        $rjobid = get_rms_jobid($jobid);
    }

    if ( defined $rjobid ) {
        $confInner{"key"} = ( $rjobid << 9 ) - 1;
    }

    if ($stats) {

        # Takes a RMS job id.
        inner_show_stats($rjobid);
        exit(0);
    }

    # Handle resource managers better, simply call a callback
    # as the outer does.
    # As usual there is a special case, on Slurm systems
    # running QsNet you can have the RMS kernel module loaded
    # and these need to be handled differently so deal with
    # them first and then go to the standard callback.

    if ( ( $confInner{rmgr} eq "slurm" ) and ( -d "/proc/rms" ) ) {

        # Takes a RMS job id.
        rms_find_pids($rjobid);
    } else {
        if ( not defined $rmgr{ $confInner{rmgr} }{find_pids} ) {
            printf("Error, rmgr $confInner{rmgr} has no find_pids callback\n");
            exit(1);
        }
        $rmgr{ $confInner{rmgr} }{find_pids}($jobid);
    }

    if ( defined $allfns{$mode}{handler_all} ) {

        $allfns{$mode}{handler_all}( $confInner{"all-pids"} );
    }

    exit(0);
}

###############################################################################
#
# Main.
#
###############################################################################

# Initialise (some of) the options which are common to both the
# inner and outer instances of padb.  Attempt to make it easy
# to add new options by keeping everything in one place.
#
# Additional work is needed to make this 100% consistent, some
# of these options have secondary options (e.g. --kill and --signal) and
# this isn't dealt with yet.
#
# stack_long has a special case later on which adds two extra handlers
# in the inner code, this could be replaced by prehandler and posthandler
# but it's the only code that needs it so far.

sub to_arg {
    my $arg = shift;
    my $res = "$arg->{arg_long}";
    if ( defined $arg->{arg_short} ) {
        $res .= "|$arg->{arg_short}";
    }
    if ( defined $arg->{type} ) {
        $res .= "=$arg->{type}";
    }
    return $res;
}

sub common_main {

    # Long command line option.
    $allfns{queue}{arg_long} = "message-queue";

    # Short command line option (optional).
    $allfns{queue}{arg_short} = "q";

    # Handler to be called for each vp, called with ($vp,$pid) on the
    # correct host for each vp.
    $allfns{queue}{handler} = \&show_queue;

    # Handler to be called in the outer when command line option is set.
    # $allfns{queue}{cmdline} = \&command_line_queue;

    # Output handlers,
    # If {out_handler} is set (to a function) assume $line_formatted and
    # call that fn with the output.
    # If {pre_out_handler} is set call this function once at start of day,
    # save it's return value and pass this to {out_handler} later.

    # Help text for this function.
    $allfns{queue}{help} = "Show the message queues";

    $allfns{kill} = {
        'handler'   => \&kill_proc,
        'arg_long'  => 'kill',
        'help'      => "Deliver signal to processes",
        'secondary' => [
            {
                'arg_long' => 'signal',
                'type'     => 's',
                'default'  => 'TERM'
            }
        ]
    };

    # There are a number of things to consider though, are there any output
    # filters that can be used with this function and are the args options
    # to the inner code or the output filter (or can they just be set for both)

    $allfns{mqueue} = {
        'handler'   => \&show_mpi_queue,
        'arg_long'  => 'mpi-queue',
        'arg_short' => 'Q',
        'help'      => "Show MPI message queues"
    };

    $allfns{deadlock} = {
        'handler_all' => \&show_mpi_queue_for_deadlock_all,
        'arg_long'    => 'deadlock',
        'arg_short'   => 'j',
        'help'        => "Run deadlock detection algorithm",
        'out_handler' => \&deadlock_detect,
    };

    $allfns{pinfo} = {
        'handler_all' => \&show_proc_all,
        'arg_long'    => 'proc-info',
        'help'        => "Show process information",
        'options_i'   => {
            "proc-shows-proc" => 1,
            "proc-shows-fds"  => 1,
            "proc-shows-maps" => 0,
            "proc-shows-stat" => 0
          }

    };

    $allfns{"proc-summary"} = {
        'handler_all' => \&show_proc_all,
        'arg_long'    => 'proc-summary',
        'help'        => "Show process information in top format",
        'options_i'   => { "column-seperator" => "  ", }

    };

    $allfns{stack} = {
        'handler_all' => \&stack_trace_from_pids,
        'arg_long'    => 'stack-trace',
        'arg_short'   => 'x',
        'help'        => "Show stack trace (see also -t)",
        'options_i'   => {
            "stack-shows-params" => 0,
            "stack-shows-locals" => 0,
            "gdb-retry-count"    => 3
        }
    };

    $allfns{stack_long} = {
        'handler_all' => \&show_full_stacks,
        'arg_long'    => 'stack-trace-full',
        'arg_short'   => 'X',
        'help'        => "Show long stack trace (with locals)",
    };

    $allfns{mpi_watch} = {
        'handler'         => \&mpi_watch,
        'arg_long'        => 'mpi-watch',
        'help'            => "Trace MPI programs",
        'pre_out_handler' => \&pre_mpi_watch,
        'out_handler'     => \&show_mpi_watch,
        'options_i'       => {
            "mpi-dll"        => "auto",
            "mpi-watch-file" => undef
        }
    };

    $allfns{set_debug} = {
        'handler'   => \&set_debug,
        'arg_long'  => 'set-debug',
        'arg_short' => 'D',
        'help'      => "Set debug flags (use --dflag=value)",
        'secondary' => [
            {
                'arg_long' => 'dflag',
                'type'     => 's',
                'default'  => '0'
            }
        ]
    };

    # Make a getopt string out of each of the optional options.
    foreach my $arg ( keys %allfns ) {
        $allfns{$arg}{arg} = to_arg( $allfns{$arg} );

        if ( defined $allfns{$arg}{secondary} ) {
            foreach my $sec ( @{ $allfns{$arg}{secondary} } ) {
                $sec->{arg} = to_arg($sec);
            }
        }
    }

}

# Now run some actual code.

common_main();

if ( $ARGV[0] eq "--inner" ) {
    shift @ARGV;
    inner_main();
} else {
    outer_main();
}

exit(0);
